diff --git a/tiledb/sm/global_state/global_state.cc b/tiledb/sm/global_state/global_state.cc index ff215a277d8..92a8d4fe873 100644 --- a/tiledb/sm/global_state/global_state.cc +++ b/tiledb/sm/global_state/global_state.cc @@ -50,6 +50,8 @@ namespace tiledb { namespace sm { namespace global_state { +extern int tbb_nthreads_; + GlobalState& GlobalState::GetGlobalState() { // This is thread-safe in C++11. static GlobalState globalState; @@ -120,6 +122,14 @@ std::set GlobalState::storage_managers() { const std::string& GlobalState::cert_file() { return cert_file_; } + +int GlobalState::tbb_threads() { + if (!initialized_) + return 0; + + return tbb_nthreads_; +} + } // namespace global_state } // namespace sm } // namespace tiledb diff --git a/tiledb/sm/global_state/global_state.h b/tiledb/sm/global_state/global_state.h index 6bda08fd3a9..03f4e857c33 100644 --- a/tiledb/sm/global_state/global_state.h +++ b/tiledb/sm/global_state/global_state.h @@ -92,6 +92,13 @@ class GlobalState { */ const std::string& cert_file(); + /** + * Gets the number of threads that TBB was initialized with. Returns + * 0 if TBB is disabled or the global state has not been initialized. + * @return the number of configured TBB threads. + */ + int tbb_threads(); + private: /** The TileDB configuration parameters. */ Config config_; diff --git a/tiledb/sm/global_state/tbb_state.cc b/tiledb/sm/global_state/tbb_state.cc index f7aa94efc56..b52d41f3611 100644 --- a/tiledb/sm/global_state/tbb_state.cc +++ b/tiledb/sm/global_state/tbb_state.cc @@ -50,7 +50,7 @@ namespace global_state { static std::unique_ptr tbb_scheduler_; /** The number of TBB threads the scheduler was configured with **/ -static int tbb_nthreads_; +int tbb_nthreads_; Status init_tbb(const Config* config) { int nthreads; @@ -95,6 +95,7 @@ Status init_tbb(const Config* config) { return Status::Error(msg.str()); } } + return Status::Ok(); } @@ -108,6 +109,8 @@ namespace tiledb { namespace sm { namespace global_state { +int tbb_nthreads_ = 0; + Status init_tbb(const Config* config) { (void)config; return Status::Ok(); diff --git a/tiledb/sm/query/reader.cc b/tiledb/sm/query/reader.cc index 88c3e8cbb9f..5e78c2c7945 100644 --- a/tiledb/sm/query/reader.cc +++ b/tiledb/sm/query/reader.cc @@ -36,6 +36,7 @@ #include "tiledb/sm/array_schema/dimension.h" #include "tiledb/sm/filesystem/vfs.h" #include "tiledb/sm/fragment/fragment_metadata.h" +#include "tiledb/sm/global_state/global_state.h" #include "tiledb/sm/misc/comparators.h" #include "tiledb/sm/misc/logger.h" #include "tiledb/sm/misc/parallel_functions.h" @@ -878,65 +879,132 @@ Status Reader::compute_sparse_result_tiles( STATS_END_TIMER(stats::Stats::TimerType::READ_COMPUTE_SPARSE_RESULT_TILES) } -Status Reader::copy_cells( - const std::string& attribute, +Status Reader::copy_fixed_cells( + const std::string& name, uint64_t stride, const std::vector& result_cell_slabs, - CopyCellsContextCache* const ctx_cache) { + CopyFixedCellsContextCache* const ctx_cache) { assert(ctx_cache); + auto stat_type = (array_schema_->is_attr(name)) ? + stats::Stats::TimerType::READ_COPY_FIXED_ATTR_VALUES : + stats::Stats::TimerType::READ_COPY_FIXED_COORDS; + STATS_START_TIMER(stat_type) + if (result_cell_slabs.empty()) { zero_out_buffer_sizes(); return Status::Ok(); } - if (array_schema_->var_size(attribute)) - return copy_var_cells(attribute, stride, result_cell_slabs, ctx_cache); - return copy_fixed_cells(attribute, stride, result_cell_slabs); -} - -Status Reader::copy_fixed_cells( - const std::string& name, - uint64_t stride, - const std::vector& result_cell_slabs) { - auto stat_type = (array_schema_->is_attr(name)) ? - stats::Stats::TimerType::READ_COPY_FIXED_ATTR_VALUES : - stats::Stats::TimerType::READ_COPY_FIXED_COORDS; - STATS_START_TIMER(stat_type) + // Perform a lazy initialization of the context cache for copying + // fixed cells. + populate_cfc_ctx_cache(result_cell_slabs, ctx_cache); - // For easy reference auto it = buffers_.find(name); - auto buffer = (unsigned char*)it->second.buffer_; auto buffer_size = it->second.buffer_size_; auto cell_size = array_schema_->cell_size(name); - // Precompute the cell range destination offsets in the buffer - auto num_cs = result_cell_slabs.size(); + // Precompute the cell range destination offsets in the buffer. uint64_t buffer_offset = 0; - std::vector cs_offsets(num_cs); - for (uint64_t i = 0; i < num_cs; i++) { + for (uint64_t i = 0; i < ctx_cache->cs_offsets.size(); i++) { const auto& cs = result_cell_slabs[i]; auto bytes_to_copy = cs.length_ * cell_size; - cs_offsets[i] = buffer_offset; + ctx_cache->cs_offsets[i] = buffer_offset; buffer_offset += bytes_to_copy; } - // Handle overflow + // Handle overflow. if (buffer_offset > *buffer_size) { read_state_.overflowed_ = true; return Status::Ok(); } - // Copy cell ranges in parallel. - auto statuses = parallel_for(0, num_cs, [&](uint64_t i) { - const auto& cs = result_cell_slabs[i]; - uint64_t offset = cs_offsets[i]; - // Check for overflow - assert(offset + cs.length_ * cell_size <= *buffer_size); + // Copy result cell slabs in parallel. + std::function copy_fn = std::bind( + &Reader::copy_partitioned_fixed_cells, + this, + std::placeholders::_1, + &name, + stride, + &result_cell_slabs, + ctx_cache); + auto statuses = + parallel_for(0, ctx_cache->cs_partitions.size(), std::move(copy_fn)); + + for (auto st : statuses) + RETURN_NOT_OK(st); + + // Update buffer offsets + *(buffers_[name].buffer_size_) = buffer_offset; + + return Status::Ok(); + + STATS_END_TIMER(stat_type) +} + +void Reader::populate_cfc_ctx_cache( + const std::vector& result_cell_slabs, + CopyFixedCellsContextCache* const ctx_cache) { + auto cs_offsets = &ctx_cache->cs_offsets; + auto cs_partitions = &ctx_cache->cs_partitions; + + // If `ctx_cache` is already populated, we're done. + if (!cs_offsets->empty()) { + return; + } + + // Allocate and resize `cs_offsets`. + auto num_cs = result_cell_slabs.size(); + cs_offsets->resize(num_cs); + + // Partition the range of the `result_cell_slab` into + // individual cell ranges for each TBB thread. + const int tbb_threads = + global_state::GlobalState::GetGlobalState().tbb_threads() > 0 ? + global_state::GlobalState::GetGlobalState().tbb_threads() : + 1; + const uint64_t num_cs_partitions = std::min(tbb_threads, num_cs); + const uint64_t cs_per_partition = num_cs / num_cs_partitions; + const uint64_t cs_per_partition_carry = num_cs % num_cs_partitions; + + // Calculate the partition offsets into `cs_offsets`. + uint64_t num_cs_partitioned = 0; + for (uint64_t i = 0; i < num_cs_partitions; ++i) { + const uint64_t num_cs_in_partition = + cs_per_partition + ((i < cs_per_partition_carry) ? 1 : 0); + num_cs_partitioned += num_cs_in_partition; + cs_partitions->emplace_back(num_cs_partitioned); + } +} + +Status Reader::copy_partitioned_fixed_cells( + const size_t partition_idx, + const std::string* const name, + const uint64_t stride, + const std::vector* const result_cell_slabs, + const CopyFixedCellsContextCache* const ctx_cache) { + assert(name); + assert(result_cell_slabs); + assert(ctx_cache); + + // For easy reference. + auto it = buffers_.find(*name); + auto buffer = (unsigned char*)it->second.buffer_; + auto cell_size = array_schema_->cell_size(*name); + + // Calculate the partition to operate on. + const uint64_t cs_idx_start = + partition_idx == 0 ? 0 : ctx_cache->cs_partitions[partition_idx - 1]; + const uint64_t cs_idx_end = ctx_cache->cs_partitions[partition_idx]; + + // Copy the cells. + for (uint64_t cs_idx = cs_idx_start; cs_idx < cs_idx_end; ++cs_idx) { + const auto& cs = (*result_cell_slabs)[cs_idx]; + uint64_t offset = ctx_cache->cs_offsets[cs_idx]; // Copy if (cs.tile_ == nullptr) { // Empty range - auto type = array_schema_->type(name); + auto type = array_schema_->type(*name); auto fill_size = datatype_size(type); auto fill_value = constants::fill_value(type); assert(fill_value != nullptr); @@ -949,36 +1017,27 @@ Status Reader::copy_fixed_cells( } else { // Non-empty range if (stride == UINT64_MAX) { RETURN_NOT_OK( - cs.tile_->read(name, buffer + offset, cs.start_, cs.length_)); + cs.tile_->read(*name, buffer + offset, cs.start_, cs.length_)); } else { auto cell_offset = offset; auto start = cs.start_; for (uint64_t j = 0; j < cs.length_; ++j) { - RETURN_NOT_OK(cs.tile_->read(name, buffer + cell_offset, start, 1)); + RETURN_NOT_OK(cs.tile_->read(*name, buffer + cell_offset, start, 1)); cell_offset += cell_size; start += stride; } } } - return Status::Ok(); - }); - - for (auto st : statuses) - RETURN_NOT_OK(st); - - // Update buffer offsets - *(buffers_[name].buffer_size_) = buffer_offset; + } return Status::Ok(); - - STATS_END_TIMER(stat_type) } Status Reader::copy_var_cells( const std::string& name, uint64_t stride, const std::vector& result_cell_slabs, - CopyCellsContextCache* const ctx_cache) { + CopyVarCellsContextCache* const ctx_cache) { assert(ctx_cache); auto stat_type = (array_schema_->is_attr(name)) ? @@ -986,31 +1045,30 @@ Status Reader::copy_var_cells( stats::Stats::TimerType::READ_COPY_VAR_COORDS; STATS_START_TIMER(stat_type); - // For easy reference - auto it = buffers_.find(name); - auto buffer = (unsigned char*)it->second.buffer_; - auto buffer_var = (unsigned char*)it->second.buffer_var_; - auto buffer_size = it->second.buffer_size_; - auto buffer_var_size = it->second.buffer_var_size_; - uint64_t offset_size = constants::cell_var_offset_size; - auto type = array_schema_->type(name); - auto fill_size = datatype_size(type); - auto fill_value = constants::fill_value(type); - assert(fill_value != nullptr); + if (result_cell_slabs.empty()) { + zero_out_buffer_sizes(); + return Status::Ok(); + } + + // Perform a lazy initialization of the context cache for copying + // fixed cells. + populate_cvc_ctx_cache(result_cell_slabs, ctx_cache); // Compute the destinations of offsets and var-len data in the buffers. - auto offset_offsets_per_cs = &ctx_cache->offset_offsets_per_cs; - auto var_offsets_per_cs = &ctx_cache->var_offsets_per_cs; uint64_t total_offset_size, total_var_size; RETURN_NOT_OK(compute_var_cell_destinations( name, stride, result_cell_slabs, - offset_offsets_per_cs, - var_offsets_per_cs, + &ctx_cache->offset_offsets_per_cs, + &ctx_cache->var_offsets_per_cs, &total_offset_size, &total_var_size)); + auto it = buffers_.find(name); + auto buffer_size = it->second.buffer_size_; + auto buffer_var_size = it->second.buffer_var_size_; + // Check for overflow and return early (without copying) in that case. if (total_offset_size > *buffer_size || total_var_size > *buffer_var_size) { read_state_.overflowed_ = true; @@ -1018,60 +1076,15 @@ Status Reader::copy_var_cells( } // Copy result cell slabs in parallel - const auto num_cs = result_cell_slabs.size(); - auto statuses = parallel_for(0, num_cs, [&](uint64_t cs_idx) { - const auto& cs = result_cell_slabs[cs_idx]; - const auto& offset_offsets = (*offset_offsets_per_cs)[cs_idx]; - const auto& var_offsets = (*var_offsets_per_cs)[cs_idx]; - - // Get tile information, if the range is nonempty. - uint64_t* tile_offsets = nullptr; - Tile* tile_var = nullptr; - uint64_t tile_cell_num = 0; - if (cs.tile_ != nullptr) { - const auto tile_pair = cs.tile_->tile_pair(name); - Tile* const tile = &tile_pair->first; - tile_var = &tile_pair->second; - // Get the internal buffer to the offset values. - ChunkedBuffer* const chunked_buffer = tile->chunked_buffer(); - - // Offset tiles are always contiguously allocated. - assert( - chunked_buffer->buffer_addressing() == - ChunkedBuffer::BufferAddressing::CONTIGUOUS); - - tile_offsets = (uint64_t*)chunked_buffer->get_contiguous_unsafe(); - tile_cell_num = tile->cell_num(); - } - - // Copy each cell in the range - uint64_t dest_vec_idx = 0; - stride = (stride == UINT64_MAX) ? 1 : stride; - for (auto cell_idx = cs.start_; dest_vec_idx < cs.length_; - cell_idx += stride, dest_vec_idx++) { - auto offset_dest = buffer + offset_offsets[dest_vec_idx]; - auto var_offset = var_offsets[dest_vec_idx]; - auto var_dest = buffer_var + var_offset; - - // Copy offset - std::memcpy(offset_dest, &var_offset, offset_size); - - // Copy variable-sized value - if (cs.tile_ == nullptr) { - std::memcpy(var_dest, &fill_value, fill_size); - } else { - const uint64_t cell_var_size = - (cell_idx != tile_cell_num - 1) ? - tile_offsets[cell_idx + 1] - tile_offsets[cell_idx] : - tile_var->size() - (tile_offsets[cell_idx] - tile_offsets[0]); - const uint64_t tile_var_offset = - tile_offsets[cell_idx] - tile_offsets[0]; - RETURN_NOT_OK(tile_var->read(var_dest, cell_var_size, tile_var_offset)); - } - } - - return Status::Ok(); - }); + std::function copy_fn = std::bind( + &Reader::copy_partitioned_var_cells, + this, + std::placeholders::_1, + &name, + stride, + &result_cell_slabs, + ctx_cache); + auto statuses = parallel_for(0, ctx_cache->cs_partitions.size(), copy_fn); // Check all statuses for (auto st : statuses) @@ -1086,12 +1099,78 @@ Status Reader::copy_var_cells( STATS_END_TIMER(stat_type); } +void Reader::populate_cvc_ctx_cache( + const std::vector& result_cell_slabs, + CopyVarCellsContextCache* const ctx_cache) { + assert(ctx_cache); + + auto cs_partitions = &ctx_cache->cs_partitions; + auto offset_offsets_per_cs = &ctx_cache->offset_offsets_per_cs; + auto var_offsets_per_cs = &ctx_cache->var_offsets_per_cs; + + // If `ctx_cache` is already populated, we're done. + if (!cs_partitions->empty()) { + return; + } + + // Calculate the partition range. + const size_t num_cs = result_cell_slabs.size(); + const int tbb_threads = + global_state::GlobalState::GetGlobalState().tbb_threads() > 0 ? + global_state::GlobalState::GetGlobalState().tbb_threads() : + 1; + const uint64_t num_cs_partitions = std::min(tbb_threads, num_cs); + const uint64_t cs_per_partition = num_cs / num_cs_partitions; + const uint64_t cs_per_partition_carry = num_cs % num_cs_partitions; + + // Allocate space for all of the partitions. + cs_partitions->reserve(num_cs_partitions); + + // Compute the boundary between each partition. Each boundary + // is represented by an `std::pair` that contains the total + // length of each cell slab in the leading partition and an + // exclusive cell slab index that ends the partition. + size_t total_cs_length = 0; + uint64_t next_partition_idx = cs_per_partition; + if (cs_per_partition_carry > 0) + ++next_partition_idx; + + for (uint64_t cs_idx = 0; cs_idx < num_cs; cs_idx++) { + if (cs_idx == next_partition_idx) { + cs_partitions->emplace_back(total_cs_length, cs_idx); + + // The final partition may contain extra cell slabs that did + // not evenly divide into the partition range. Set the + // `next_partition_idx` to zero and build the last boundary + // after this for-loop. + if (cs_partitions->size() == num_cs_partitions) { + next_partition_idx = 0; + } else { + next_partition_idx += cs_per_partition; + if (cs_idx < (cs_per_partition_carry - 1)) + ++next_partition_idx; + } + } + + total_cs_length += result_cell_slabs[cs_idx].length_; + } + + // Store the final boundary. + cs_partitions->emplace_back(total_cs_length, num_cs); + + // Allocate and size both `offset_offsets_per_cs` and + // `var_offsets_per_cs` to contain enough elements for + // storing any result cell. + offset_offsets_per_cs->resize(total_cs_length); + var_offsets_per_cs->resize(total_cs_length); +} + Status Reader::compute_var_cell_destinations( const std::string& name, uint64_t stride, const std::vector& result_cell_slabs, - std::vector>* offset_offsets_per_cs, - std::vector>* var_offsets_per_cs, + std::vector* offset_offsets_per_cs, + std::vector* var_offsets_per_cs, uint64_t* total_offset_size, uint64_t* total_var_size) const { // For easy reference @@ -1100,17 +1179,12 @@ Status Reader::compute_var_cell_destinations( auto type = array_schema_->type(name); auto fill_size = datatype_size(type); - // Resize the output vectors - offset_offsets_per_cs->resize(num_cs); - var_offsets_per_cs->resize(num_cs); - // Compute the destinations for all result cell slabs *total_offset_size = 0; *total_var_size = 0; + size_t total_cs_length = 0; for (uint64_t cs_idx = 0; cs_idx < num_cs; cs_idx++) { const auto& cs = result_cell_slabs[cs_idx]; - (*offset_offsets_per_cs)[cs_idx].resize(cs.length_); - (*var_offsets_per_cs)[cs_idx].resize(cs.length_); // Get tile information, if the range is nonempty. uint64_t* tile_offsets = nullptr; @@ -1151,11 +1225,105 @@ Status Reader::compute_var_cell_destinations( } // Record destination offsets. - (*offset_offsets_per_cs)[cs_idx][dest_vec_idx] = *total_offset_size; - (*var_offsets_per_cs)[cs_idx][dest_vec_idx] = *total_var_size; + (*offset_offsets_per_cs)[total_cs_length + dest_vec_idx] = + *total_offset_size; + (*var_offsets_per_cs)[total_cs_length + dest_vec_idx] = *total_var_size; *total_offset_size += offset_size; *total_var_size += cell_var_size; } + + total_cs_length += cs.length_; + } + + return Status::Ok(); +} + +Status Reader::copy_partitioned_var_cells( + const size_t partition_idx, + const std::string* const name, + uint64_t stride, + const std::vector* const result_cell_slabs, + CopyVarCellsContextCache* const ctx_cache) { + assert(name); + assert(result_cell_slabs); + assert(ctx_cache); + + auto it = buffers_.find(*name); + auto buffer = (unsigned char*)it->second.buffer_; + auto buffer_var = (unsigned char*)it->second.buffer_var_; + uint64_t offset_size = constants::cell_var_offset_size; + auto type = array_schema_->type(*name); + auto fill_size = datatype_size(type); + auto fill_value = constants::fill_value(type); + assert(fill_value != nullptr); + auto cs_partitions = &ctx_cache->cs_partitions; + auto offset_offsets_per_cs = &ctx_cache->offset_offsets_per_cs; + auto var_offsets_per_cs = &ctx_cache->var_offsets_per_cs; + + // Fetch the starting array offset into both `offset_offsets_per_cs` + // and `var_offsets_per_cs`. + size_t arr_offset = + partition_idx == 0 ? 0 : (*cs_partitions)[partition_idx - 1].first; + + // Fetch the inclusive starting cell slab index and the exclusive ending + // cell slab index. + const size_t start_cs_idx = + partition_idx == 0 ? 0 : (*cs_partitions)[partition_idx - 1].second; + const size_t end_cs_idx = (*cs_partitions)[partition_idx].second; + + // Copy all cells within the range of cell slabs. + for (uint64_t cs_idx = start_cs_idx; cs_idx < end_cs_idx; ++cs_idx) { + const auto& cs = (*result_cell_slabs)[cs_idx]; + + // Get tile information, if the range is nonempty. + uint64_t* tile_offsets = nullptr; + Tile* tile_var = nullptr; + uint64_t tile_cell_num = 0; + if (cs.tile_ != nullptr) { + const auto tile_pair = cs.tile_->tile_pair(*name); + Tile* const tile = &tile_pair->first; + tile_var = &tile_pair->second; + // Get the internal buffer to the offset values. + ChunkedBuffer* const chunked_buffer = tile->chunked_buffer(); + + // Offset tiles are always contiguously allocated. + assert( + chunked_buffer->buffer_addressing() == + ChunkedBuffer::BufferAddressing::CONTIGUOUS); + + tile_offsets = (uint64_t*)chunked_buffer->get_contiguous_unsafe(); + tile_cell_num = tile->cell_num(); + } + + // Copy each cell in the range + uint64_t dest_vec_idx = 0; + stride = (stride == UINT64_MAX) ? 1 : stride; + for (auto cell_idx = cs.start_; dest_vec_idx < cs.length_; + cell_idx += stride, dest_vec_idx++) { + auto offset_dest = + buffer + (*offset_offsets_per_cs)[arr_offset + dest_vec_idx]; + // offset_offsets[dest_vec_idx]; + auto var_offset = (*var_offsets_per_cs)[arr_offset + dest_vec_idx]; + auto var_dest = buffer_var + var_offset; + + // Copy offset + std::memcpy(offset_dest, &var_offset, offset_size); + + // Copy variable-sized value + if (cs.tile_ == nullptr) { + std::memcpy(var_dest, &fill_value, fill_size); + } else { + const uint64_t cell_var_size = + (cell_idx != tile_cell_num - 1) ? + tile_offsets[cell_idx + 1] - tile_offsets[cell_idx] : + tile_var->size() - (tile_offsets[cell_idx] - tile_offsets[0]); + const uint64_t tile_var_offset = + tile_offsets[cell_idx] - tile_offsets[0]; + RETURN_NOT_OK(tile_var->read(var_dest, cell_var_size, tile_var_offset)); + } + } + + arr_offset += cs.length_; } return Status::Ok(); @@ -2200,11 +2368,15 @@ Status Reader::copy_coordinates( const std::vector& result_cell_slabs) { STATS_START_TIMER(stats::Stats::TimerType::READ_COPY_COORDS); - uint64_t stride = UINT64_MAX; - - CopyCellsContextCache ctx_cache; + const uint64_t stride = UINT64_MAX; - // Copy coordinates + // Build a lists of coordinate names to copy, separating them by + // whether they are of fixed or variable length. The motivation + // is that copying fixed and variable cells require two different + // context caches. Processing them separately allows us to maintain + // a single context cache at the same time to reduce memory use. + std::vector fixed_names; + std::vector var_names; for (const auto& it : buffers_) { const auto& name = it.first; if (read_state_.overflowed_) @@ -2212,9 +2384,30 @@ Status Reader::copy_coordinates( if (!(name == constants::coords || array_schema_->is_dim(name))) continue; - RETURN_CANCEL_OR_ERROR( - copy_cells(name, stride, result_cell_slabs, &ctx_cache)); - clear_tiles(name, result_tiles); + if (array_schema_->var_size(name)) + var_names.emplace_back(name); + else + fixed_names.emplace_back(name); + } + + // Copy result cells for fixed-sized coordinates. + if (!fixed_names.empty()) { + CopyFixedCellsContextCache ctx_cache; + for (const auto& name : fixed_names) { + RETURN_CANCEL_OR_ERROR( + copy_fixed_cells(name, stride, result_cell_slabs, &ctx_cache)); + clear_tiles(name, result_tiles); + } + } + + // Copy result cells for var-sized coordinates. + if (!var_names.empty()) { + CopyVarCellsContextCache ctx_cache; + for (const auto& name : var_names) { + RETURN_CANCEL_OR_ERROR( + copy_var_cells(name, stride, result_cell_slabs, &ctx_cache)); + clear_tiles(name, result_tiles); + } } return Status::Ok(); @@ -2245,9 +2438,13 @@ Status Reader::copy_attribute_values( ++it; } - CopyCellsContextCache ctx_cache; - - // Copy result cells only for the attributes + // Build a lists of attribute names to copy, separating them by + // whether they are of fixed or variable length. The motivation + // is that copying fixed and variable cells require two different + // context caches. Processing them separately allows us to maintain + // a single context cache at the same time to reduce memory use. + std::vector fixed_names; + std::vector var_names; for (const auto& it : buffers_) { const auto& name = it.first; if (read_state_.overflowed_) @@ -2255,11 +2452,34 @@ Status Reader::copy_attribute_values( if (name == constants::coords || array_schema_->is_dim(name)) continue; - RETURN_CANCEL_OR_ERROR(read_tiles(name, result_tiles)); - RETURN_CANCEL_OR_ERROR(unfilter_tiles(name, result_tiles, &cs_ranges)); - RETURN_CANCEL_OR_ERROR( - copy_cells(name, stride, result_cell_slabs, &ctx_cache)); - clear_tiles(name, result_tiles); + if (array_schema_->var_size(name)) + var_names.emplace_back(name); + else + fixed_names.emplace_back(name); + } + + // Copy result cells for fixed-sized attributes. + if (!fixed_names.empty()) { + CopyFixedCellsContextCache ctx_cache; + for (const auto& name : fixed_names) { + RETURN_CANCEL_OR_ERROR(read_tiles(name, result_tiles)); + RETURN_CANCEL_OR_ERROR(unfilter_tiles(name, result_tiles, &cs_ranges)); + RETURN_CANCEL_OR_ERROR( + copy_fixed_cells(name, stride, result_cell_slabs, &ctx_cache)); + clear_tiles(name, result_tiles); + } + } + + // Copy result cells for var-sized attributes. + if (!var_names.empty()) { + CopyVarCellsContextCache ctx_cache; + for (const auto& name : var_names) { + RETURN_CANCEL_OR_ERROR(read_tiles(name, result_tiles)); + RETURN_CANCEL_OR_ERROR(unfilter_tiles(name, result_tiles, &cs_ranges)); + RETURN_CANCEL_OR_ERROR( + copy_var_cells(name, stride, result_cell_slabs, &ctx_cache)); + clear_tiles(name, result_tiles); + } } return Status::Ok(); diff --git a/tiledb/sm/query/reader.h b/tiledb/sm/query/reader.h index 16ef0f1cf2a..ae074e408df 100644 --- a/tiledb/sm/query/reader.h +++ b/tiledb/sm/query/reader.h @@ -496,17 +496,38 @@ class Reader { /* PRIVATE DATATYPES */ /* ********************************* */ - /** - * Contains data structures for re-use between invocations of - * `copy_cells`. The intent is to reduce CPU time spent allocating - * and deallocating expensive data structures. - */ - struct CopyCellsContextCache { - /** An input for `compute_var_cell_destinations`. */ - std::vector> offset_offsets_per_cs; + struct CopyFixedCellsContextCache { + /** + * Maps the index of each cell slab to its offset in the + * output buffer. + */ + std::vector cs_offsets; + + /** + * Logical partitions of `cs_offsets`. Each element is the + * partition's starting index. + */ + std::vector cs_partitions; + }; + + struct CopyVarCellsContextCache { + /** + * Maps each cell slab to its offset for its attribute offsets. + */ + std::vector offset_offsets_per_cs; + + /** + * Maps each cell slab to its offset for its variable-length data. + */ + std::vector var_offsets_per_cs; - /** An input for `compute_var_cell_destinations`. */ - std::vector> var_offsets_per_cs; + /** + * Logical partitions of both `offset_offsets_per_cs` and + * `var_offsets_per_cs`. Each element contains a pair, where the + * first pair-element is the partition's starting index and the + * second pair-element is the number of cell slabs in the partition. + */ + std::vector> cs_partitions; }; /* ********************************* */ @@ -696,10 +717,10 @@ class Reader { std::vector* single_fragment); /** - * Copies the cells for the input attribute and result cell slabs, into - * the corresponding result buffers. + * Copies the cells for the input **fixed-sized** attribute/dimension and + * result cell slabs into the corresponding result buffers. * - * @param attribute The targeted attribute. + * @param name The targeted attribute/dimension. * @param stride If it is `UINT64_MAX`, then the cells in the result * cell slabs are all contiguous. Otherwise, each cell in the * result cell slabs are `stride` cells apart from each other. @@ -708,31 +729,46 @@ class Reader { * calls to improve performance. * @return Status */ - Status copy_cells( - const std::string& attribute, + Status copy_fixed_cells( + const std::string& name, uint64_t stride, const std::vector& result_cell_slabs, - CopyCellsContextCache* ctx_cache); + CopyFixedCellsContextCache* ctx_cache); + + /** + * Populates 'ctx_cache' for fixed-sized cell copying. + * + * @param result_cell_slabs The result cell slabs to copy cells for. + * @param ctx_cache The context cache to populate. + */ + void populate_cfc_ctx_cache( + const std::vector& result_cell_slabs, + CopyFixedCellsContextCache* ctx_cache); /** * Copies the cells for the input **fixed-sized** attribute/dimension and - * result cell slabs, into the corresponding result buffers. + * result cell slabs into the corresponding result buffers for the + * partition in `ctx_cache` at index `partition_idx`. * - * @param name The targeted attribute/diemnsion. + * @param name The partition index. + * @param name The targeted attribute/dimension. * @param stride If it is `UINT64_MAX`, then the cells in the result * cell slabs are all contiguous. Otherwise, each cell in the * result cell slabs are `stride` cells apart from each other. * @param result_cell_slabs The result cell slabs to copy cells for. + * @param ctx_cache The context cache containing partition information. * @return Status */ - Status copy_fixed_cells( - const std::string& name, + Status copy_partitioned_fixed_cells( + size_t partition_idx, + const std::string* name, uint64_t stride, - const std::vector& result_cell_slabs); + const std::vector* result_cell_slabs, + const CopyFixedCellsContextCache* ctx_cache); /** * Copies the cells for the input **var-sized** attribute/dimension and result - * cell slabs, into the corresponding result buffers. + * cell slabs into the corresponding result buffers. * * @param name The targeted attribute/dimension. * @param stride If it is `UINT64_MAX`, then the cells in the result @@ -747,7 +783,17 @@ class Reader { const std::string& name, uint64_t stride, const std::vector& result_cell_slabs, - CopyCellsContextCache* ctx_cache); + CopyVarCellsContextCache* ctx_cache); + + /** + * Populates 'ctx_cache' for var-sized cell copying. + * + * @param result_cell_slabs The result cell slabs to copy cells for. + * @param ctx_cache The context cache to populate. + */ + void populate_cvc_ctx_cache( + const std::vector& result_cell_slabs, + CopyVarCellsContextCache* ctx_cache); /** * Computes offsets into destination buffers for the given @@ -775,11 +821,32 @@ class Reader { const std::string& name, uint64_t stride, const std::vector& result_cell_slabs, - std::vector>* offset_offsets_per_cs, - std::vector>* var_offsets_per_cs, + std::vector* offset_offsets_per_cs, + std::vector* var_offsets_per_cs, uint64_t* total_offset_size, uint64_t* total_var_size) const; + /** + * Copies the cells for the input **var-sized** attribute/dimension and result + * cell slabs into the corresponding result buffers for the + * partition in `ctx_cache` at index `partition_idx`. + * + * @param name The partition index. + * @param name The targeted attribute/dimension. + * @param stride If it is `UINT64_MAX`, then the cells in the result + * cell slabs are all contiguous. Otherwise, each cell in the + * result cell slabs are `stride` cells apart from each other. + * @param result_cell_slabs The result cell slabs to copy cells for. + * @param ctx_cache The context cache containing partition information. + * @return Status + */ + Status copy_partitioned_var_cells( + size_t partition_idx, + const std::string* name, + uint64_t stride, + const std::vector* result_cell_slabs, + CopyVarCellsContextCache* ctx_cache); + /** * Computes the result space tiles based on the input subarray. *