Skip to content

Commit

Permalink
Reader::compute_result_coords sub-partitioner (#1726)
Browse files Browse the repository at this point in the history
This patch introduces a sub-partitioner to compute and sort the range
result coords. The motiviation is to partition the elements sorted in
parallel_sort (which runs in O(N * LOG(N)) time complexity). In the current
scenario that I'm testing, this improves multi-fragment reads from ~2.9s
to ~2.4s. The single-fragment read runs in ~1.8s. This reduces the total
multi-fragment read overhead from 61% to 33%.

// Single-fragment
```
- Read time: 1.81197 secs
  * Time to compute next partition: 0.059505 secs
  * Time to compute result coordinates: 0.427879 secs
    > Time to compute sparse result tiles: 0.00213199 secs
    > Time to read coordinate tiles: 0.0205255 secs
    > Time to unfilter coordinate tiles: 0.0236447 secs
    > Time to compute range result coordinates: 0.126082 secs
  * Time to compute sparse result cell slabs: 0.0142938 secs
  * Time to copy result attribute values: 1.27557 secs
    > Time to read attribute tiles: 0.325302 secs
    > Time to unfilter attribute tiles: 0.376115 secs
    > Time to copy fixed-sized attribute values: 0.324924 secs
    > Time to copy var-sized attribute values: 0.146791 secs
  * Time to copy result coordinates: 0.0475786 secs
    > Time to copy fixed-sized coordinates: 0.0238743 secs

- Total read query time (array open + init state + read): 1.81201 secs
```

// Multi-fragment
```
- Read time: 2.40146 secs
  * Time to compute next partition: 0.0846776 secs
  * Time to compute result coordinates: 0.701766 secs
    > Time to compute sparse result tiles: 0.00235241 secs
    > Time to read coordinate tiles: 0.0225933 secs
    > Time to unfilter coordinate tiles: 0.0239695 secs
    > Time to compute range result coordinates: 0.101788 secs
  * Time to compute sparse result cell slabs: 0.0431639 secs
  * Time to copy result attribute values: 1.56159 secs
    > Time to read attribute tiles: 0.338785 secs
    > Time to unfilter attribute tiles: 0.397662 secs
    > Time to copy fixed-sized attribute values: 0.356104 secs
    > Time to copy var-sized attribute values: 0.313703 secs
  * Time to copy result coordinates: 0.0425039 secs
    > Time to copy fixed-sized coordinates: 0.0271955 secs

- Total read query time (array open + init state + read): 2.4015 secs
```

// Multi-fragment (without this patch)
```
- Read time: 2.91803 secs
  * Time to compute next partition: 0.0418832 secs
  * Time to compute result coordinates: 0.988186 secs
    > Time to compute sparse result tiles: 0.00240205 secs
    > Time to read coordinate tiles: 0.0225808 secs
    > Time to unfilter coordinate tiles: 0.0243308 secs
    > Time to compute range result coordinates: 0.287352 secs
  * Time to compute sparse result cell slabs: 0.0378204 secs
  * Time to copy result attribute values: 1.76864 secs
    > Time to read attribute tiles: 0.343348 secs
    > Time to unfilter attribute tiles: 0.41717 secs
    > Time to copy fixed-sized attribute values: 0.436063 secs
    > Time to copy var-sized attribute values: 0.386814 secs
  * Time to copy result coordinates: 0.0684334 secs
    > Time to copy fixed-sized coordinates: 0.0382354 secs

- Total read query time (array open + init state + read): 2.91808 secs
```

Co-authored-by: Joe Maley <[email protected]>
  • Loading branch information
joe maley and Joe Maley authored Jul 17, 2020
1 parent cbcc0f2 commit 25fcb22
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 33 deletions.
3 changes: 3 additions & 0 deletions tiledb/sm/misc/constants.cc
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ const std::string special_name_prefix = "__";
/** Number of milliseconds between watchdog thread wakeups. */
const unsigned watchdog_thread_sleep_ms = 1000;

/** The target sub-partitioner budget for computing result coordinates. */
const uint64_t sub_partitioner_memory_budget = 1024 * 1024 * 5;

const void* fill_value(Datatype type) {
switch (type) {
case Datatype::INT8:
Expand Down
3 changes: 3 additions & 0 deletions tiledb/sm/misc/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,9 @@ extern const std::string special_name_prefix;
/** Number of milliseconds between watchdog thread wakeups. */
extern const unsigned watchdog_thread_sleep_ms;

/** The target sub-partitioner budget for computing result coordinates. */
extern const uint64_t sub_partitioner_memory_budget;

/** Returns the empty fill value based on the input datatype. */
const void* fill_value(Datatype type);

Expand Down
144 changes: 113 additions & 31 deletions tiledb/sm/query/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -613,22 +613,22 @@ Status Reader::compute_result_cell_slabs(
}

Status Reader::compute_range_result_coords(
Subarray* subarray,
unsigned frag_idx,
ResultTile* tile,
uint64_t range_idx,
std::vector<ResultCoords>* result_coords) {
auto coords_num = tile->cell_num();
auto dim_num = array_schema_->dim_num();
const auto& subarray = read_state_.partitioner_.current();
auto range_coords = subarray.get_range_coords(range_idx);
auto range_coords = subarray->get_range_coords(range_idx);

if (array_schema_->dense()) {
std::vector<uint8_t> result_bitmap(coords_num, 1);
std::vector<uint8_t> overwritten_bitmap(coords_num, 0);

// Compute result and overwritten bitmap per dimension
for (unsigned d = 0; d < dim_num; ++d) {
const auto& ranges = subarray.ranges_for_dim(d);
const auto& ranges = subarray->ranges_for_dim(d);
RETURN_NOT_OK(tile->compute_results_dense(
d,
ranges[range_coords[d]],
Expand All @@ -648,7 +648,7 @@ Status Reader::compute_range_result_coords(

// Compute result and overwritten bitmap per dimension
for (unsigned d = 0; d < dim_num; ++d) {
const auto& ranges = subarray.ranges_for_dim(d);
const auto& ranges = subarray->ranges_for_dim(d);
RETURN_NOT_OK(tile->compute_results_sparse(
d, ranges[range_coords[d]], &result_bitmap));
}
Expand All @@ -664,13 +664,14 @@ Status Reader::compute_range_result_coords(
}

Status Reader::compute_range_result_coords(
Subarray* subarray,
const std::vector<bool>& single_fragment,
const std::map<std::pair<unsigned, uint64_t>, size_t>& result_tile_map,
std::vector<ResultTile>* result_tiles,
std::vector<std::vector<ResultCoords>>* range_result_coords) {
STATS_START_TIMER(stats::Stats::TimerType::READ_COMPUTE_RANGE_RESULT_COORDS)

auto range_num = read_state_.partitioner_.current().range_num();
auto range_num = subarray->range_num();
range_result_coords->resize(range_num);
auto cell_order = array_schema_->cell_order();
Layout layout =
Expand All @@ -682,12 +683,18 @@ Status Reader::compute_range_result_coords(
auto statuses = parallel_for(0, range_num, [&](uint64_t r) {
// Compute overlapping coordinates per range
RETURN_NOT_OK(compute_range_result_coords(
r, result_tile_map, result_tiles, &((*range_result_coords)[r])));
subarray,
r,
result_tile_map,
result_tiles,
&((*range_result_coords)[r])));

// Dedup unless there is a single fragment or array schema allows duplicates
if (!single_fragment[r] && !allows_dups) {
RETURN_CANCEL_OR_ERROR(
sort_result_coords(&((*range_result_coords)[r]), layout));
RETURN_CANCEL_OR_ERROR(sort_result_coords(
((*range_result_coords)[r]).begin(),
((*range_result_coords)[r]).end(),
layout));
RETURN_CANCEL_OR_ERROR(dedup_result_coords(&((*range_result_coords)[r])));
}

Expand All @@ -703,13 +710,13 @@ Status Reader::compute_range_result_coords(
}

Status Reader::compute_range_result_coords(
Subarray* subarray,
uint64_t range_idx,
uint32_t fragment_idx,
const std::map<std::pair<unsigned, uint64_t>, size_t>& result_tile_map,
std::vector<ResultTile>* result_tiles,
std::vector<ResultCoords>* range_result_coords) {
const auto& subarray = read_state_.partitioner_.current();
const auto& overlap = subarray.tile_overlap();
const auto& overlap = subarray->tile_overlap();

// Skip dense fragments
if (fragment_metadata_[fragment_idx]->dense())
Expand Down Expand Up @@ -750,7 +757,7 @@ Status Reader::compute_range_result_coords(
RETURN_NOT_OK(get_all_result_coords(&tile, range_result_coords));
} else { // Partial overlap
RETURN_NOT_OK(compute_range_result_coords(
fragment_idx, &tile, range_idx, range_result_coords));
subarray, fragment_idx, &tile, range_idx, range_result_coords));
}
++t;
}
Expand All @@ -760,6 +767,7 @@ Status Reader::compute_range_result_coords(
}

Status Reader::compute_range_result_coords(
Subarray* subarray,
uint64_t range_idx,
const std::map<std::pair<unsigned, uint64_t>, size_t>& result_tile_map,
std::vector<ResultTile>* result_tiles,
Expand All @@ -769,6 +777,7 @@ Status Reader::compute_range_result_coords(
std::vector<std::vector<ResultCoords>> range_result_coords_vec(fragment_num);
auto statuses = parallel_for(0, fragment_num, [&](uint32_t f) {
return compute_range_result_coords(
subarray,
range_idx,
f,
result_tile_map,
Expand All @@ -790,6 +799,10 @@ Status Reader::compute_range_result_coords(
Status Reader::compute_subarray_coords(
std::vector<std::vector<ResultCoords>>* range_result_coords,
std::vector<ResultCoords>* result_coords) {
// The input 'result_coords' is already sorted. Save the current size
// before inserting new elements.
const size_t result_coords_size = result_coords->size();

// Add all valid ``range_result_coords`` to ``result_coords``
for (const auto& rv : *range_result_coords) {
for (const auto& c : rv) {
Expand All @@ -806,7 +819,10 @@ Status Reader::compute_subarray_coords(
auto cell_order = array_schema_->cell_order();
Layout layout = (layout_ == Layout ::UNORDERED) ? cell_order : layout_;

RETURN_NOT_OK(sort_result_coords(result_coords, layout));
RETURN_NOT_OK(sort_result_coords(
result_coords->begin() + result_coords_size,
result_coords->end(),
layout));

return Status::Ok();
}
Expand Down Expand Up @@ -1520,16 +1536,85 @@ Status Reader::compute_result_coords(
RETURN_CANCEL_OR_ERROR(unfilter_tiles(dim_name, tmp_result_tiles));
}

// Compute the read coordinates for all fragments for each subarray range
std::vector<std::vector<ResultCoords>> range_result_coords;
RETURN_CANCEL_OR_ERROR(compute_range_result_coords(
single_fragment, result_tile_map, result_tiles, &range_result_coords));
result_tile_map.clear();
// We have experimentally determined that a 5MB a sub-partitioner
// memory budget performs the quickest. If this estimate is too low
// (e.g. unsplittable), it will be corrected in a retry.
uint64_t sub_partitioner_memory_budget =
constants::sub_partitioner_memory_budget;
uint64_t sub_partitioner_memory_budget_var =
constants::sub_partitioner_memory_budget;

// Create a sub-partitioner to partition the current subarray in
// `read_state_.partitioner_`. This allows us to compute the range
// result coords and subarray coords on a smaller set of elements.
// The motiviation for this is primarily to avoid sorting a large
// number of elements within a `parallel_sort` because it has a
// time complexity of O(N*log(N)).
SubarrayPartitioner* const partitioner = &read_state_.partitioner_;
SubarrayPartitioner sub_partitioner(
partitioner->current(),
sub_partitioner_memory_budget,
sub_partitioner_memory_budget);

// Set the individual attribute budgets in the sub-partitioner
// to the same values as in the parent partitioner.
for (const auto& kv : *partitioner->get_result_budgets()) {
const std::string& attr_name = kv.first;
const SubarrayPartitioner::ResultBudget& result_budget = kv.second;
if (!array_schema_->var_size(attr_name)) {
RETURN_NOT_OK(sub_partitioner.set_result_budget(
attr_name.c_str(), result_budget.size_fixed_));
} else {
RETURN_NOT_OK(sub_partitioner.set_result_budget(
attr_name.c_str(),
result_budget.size_fixed_,
result_budget.size_var_));
}
}

// Compute final coords (sorted in the result layout) of the whole subarray.
RETURN_CANCEL_OR_ERROR(
compute_subarray_coords(&range_result_coords, result_coords));
range_result_coords.clear();
// Move to the first partition.
RETURN_NOT_OK(sub_partitioner.next(&read_state_.unsplittable_));

while (true) {
// If the sub-partitioners memory budget was too low, we may
// have been unable to split. In this scenario, double the
// budget and retry. In the worst-case scenario, the budget
// will equal the parent partitioner's budget.
while (read_state_.unsplittable_) {
uint64_t partitioner_memory_budget;
uint64_t partitioner_memory_budget_var;
RETURN_NOT_OK(partitioner->get_memory_budget(
&partitioner_memory_budget, &partitioner_memory_budget_var));

sub_partitioner_memory_budget = std::min(
partitioner_memory_budget, sub_partitioner_memory_budget * 2);
sub_partitioner_memory_budget_var = std::min(
partitioner_memory_budget_var, sub_partitioner_memory_budget_var * 2);

RETURN_NOT_OK(sub_partitioner.set_memory_budget(
sub_partitioner_memory_budget, sub_partitioner_memory_budget));

RETURN_NOT_OK(sub_partitioner.next(&read_state_.unsplittable_));
}

std::vector<std::vector<ResultCoords>> range_result_coords;
RETURN_CANCEL_OR_ERROR(compute_range_result_coords(
&sub_partitioner.current(),
single_fragment,
result_tile_map,
result_tiles,
&range_result_coords));

RETURN_CANCEL_OR_ERROR(
compute_subarray_coords(&range_result_coords, result_coords));
range_result_coords.clear();

// We're done when we have processed all sub-partitions.
if (sub_partitioner.done())
break;

RETURN_NOT_OK(sub_partitioner.next(&read_state_.unsplittable_));
}

return Status::Ok();

Expand Down Expand Up @@ -2054,7 +2139,7 @@ Status Reader::init_read_state() {
auto attr_name = a.first;
auto buffer_size = a.second.buffer_size_;
auto buffer_var_size = a.second.buffer_var_size_;
if (!array_schema_->var_size(a.first)) {
if (!array_schema_->var_size(attr_name)) {
RETURN_NOT_OK(read_state_.partitioner_.set_result_budget(
attr_name.c_str(), *buffer_size));
} else {
Expand All @@ -2063,10 +2148,6 @@ Status Reader::init_read_state() {
}
}

// Set memory budget
RETURN_NOT_OK(read_state_.partitioner_.set_memory_budget(
memory_budget_, memory_budget_var_));

read_state_.unsplittable_ = false;
read_state_.overflowed_ = false;
read_state_.initialized_ = true;
Expand Down Expand Up @@ -2319,19 +2400,20 @@ void Reader::reset_buffer_sizes() {
}

Status Reader::sort_result_coords(
std::vector<ResultCoords>* result_coords, Layout layout) const {
std::vector<ResultCoords>::iterator iter_begin,
std::vector<ResultCoords>::iterator iter_end,
Layout layout) const {
// TODO: do not sort if it is single fragment and
// (i) it is single dimension, or (ii) it is global order

auto domain = array_schema_->domain();

if (layout == Layout::ROW_MAJOR) {
parallel_sort(result_coords->begin(), result_coords->end(), RowCmp(domain));
parallel_sort(iter_begin, iter_end, RowCmp(domain));
} else if (layout == Layout::COL_MAJOR) {
parallel_sort(result_coords->begin(), result_coords->end(), ColCmp(domain));
parallel_sort(iter_begin, iter_end, ColCmp(domain));
} else if (layout == Layout::GLOBAL_ORDER) {
parallel_sort(
result_coords->begin(), result_coords->end(), GlobalCmp(domain));
parallel_sort(iter_begin, iter_end, GlobalCmp(domain));
} else {
assert(false);
}
Expand Down
15 changes: 13 additions & 2 deletions tiledb/sm/query/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -611,13 +611,15 @@ class Reader {
* Retrieves the coordinates that overlap the input N-dimensional range
* from the input result tile.
*
* @param subarray The subarray to operate on.
* @param frag_idx The id of the fragment that the result tile belongs to.
* @param tile The result tile.
* @param range_idx The range id.
* @param result_coords The overlapping coordinates to retrieve.
* @return Status
*/
Status compute_range_result_coords(
Subarray* subarray,
unsigned frag_idx,
ResultTile* tile,
uint64_t range_idx,
Expand All @@ -627,6 +629,7 @@ class Reader {
* Computes the result coordinates for each range of the query
* subarray.
*
* @param subarray The subarray to operate on.
* @param single_fragment For each range, it indicates whether all
* result coordinates come from a single fragment.
* @param result_tile_map This is an auxialiary map that helps finding the
Expand All @@ -637,6 +640,7 @@ class Reader {
* @return Status
*/
Status compute_range_result_coords(
Subarray* subarray,
const std::vector<bool>& single_fragment,
const std::map<std::pair<unsigned, uint64_t>, size_t>& result_tile_map,
std::vector<ResultTile>* result_tiles,
Expand All @@ -646,6 +650,7 @@ class Reader {
* Computes the result coordinates of a given range of the query
* subarray.
*
* @param subarray The subarray to operate on.
* @param range_idx The range to focus on.
* @param result_tile_map This is an auxialiary map that helps finding the
* result_tiles overlapping with each range.
Expand All @@ -655,6 +660,7 @@ class Reader {
* @return Status
*/
Status compute_range_result_coords(
Subarray* subarray,
uint64_t range_idx,
const std::map<std::pair<unsigned, uint64_t>, size_t>& result_tile_map,
std::vector<ResultTile>* result_tiles,
Expand All @@ -664,6 +670,7 @@ class Reader {
* Computes the result coordinates of a given range of the query
* subarray.
*
* @param subarray The subarray to operate on.
* @param range_idx The range to focus on.
* @param fragment_idx The fragment to focus on.
* @param result_tile_map This is an auxialiary map that helps finding the
Expand All @@ -674,6 +681,7 @@ class Reader {
* @return Status
*/
Status compute_range_result_coords(
Subarray* subarray,
uint64_t range_idx,
uint32_t fragment_idx,
const std::map<std::pair<unsigned, uint64_t>, size_t>& result_tile_map,
Expand Down Expand Up @@ -1152,12 +1160,15 @@ class Reader {
/**
* Sorts the input result coordinates according to the subarray layout.
*
* @param result_coords The coordinates to sort.
* @param iter_begin The start position of the coordinates to sort.
* @param iter_end The end position of the coordinates to sort.
* @param layout The layout to sort into.
* @return Status
*/
Status sort_result_coords(
std::vector<ResultCoords>* result_coords, Layout layout) const;
std::vector<ResultCoords>::iterator iter_begin,
std::vector<ResultCoords>::iterator iter_end,
Layout layout) const;

/** Performs a read on a sparse array. */
Status sparse_read();
Expand Down

0 comments on commit 25fcb22

Please sign in to comment.