Skip to content

Commit

Permalink
Reduce Status objects in parallel functions (#1761)
Browse files Browse the repository at this point in the history
Currently, we return a vector<Status> in the parallel_for/parallel_for_2d
routines. The return vector contains a status for each invocation of the
function. This can be 100,000+ objects. This patch modifies the routines to
only return the first failed status.

Co-authored-by: Joe Maley <[email protected]>
  • Loading branch information
joe maley and Joe Maley authored Aug 10, 2020
1 parent 2b47384 commit 4878ae6
Showing 1 changed file with 46 additions and 13 deletions.
59 changes: 46 additions & 13 deletions tiledb/sm/misc/parallel_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,39 @@ template <typename FuncT>
std::vector<Status> parallel_for(uint64_t begin, uint64_t end, const FuncT& F) {
assert(begin <= end);

const uint64_t range_len = end - begin;
std::vector<Status> result(range_len);
std::vector<Status> result;

const uint64_t range_len = end - begin;
if (range_len == 0)
return result;

#ifdef HAVE_TBB
result.resize(range_len);
tbb::parallel_for(begin, end, [begin, &result, &F](uint64_t i) {
result[i - begin] = F(i);
});
#else
// The return vector will be a single Status object containing
// the first failed status that we encounter. When we remove TBB,
// we will change the interface to return a single Status object.
bool failed = false;
Status return_st = Status::Ok();
std::mutex return_st_mutex;

// Executes subrange [subrange_start, subrange_end) that exists
// within the range [begin, end).
std::function<Status(uint64_t, uint64_t)> execute_subrange =
[begin, &result, &F](
[&failed, &return_st, &return_st_mutex, &F](
const uint64_t subrange_start,
const uint64_t subrange_end) -> Status {
for (uint64_t i = subrange_start; i < subrange_end; ++i)
result[i - begin] = F(i);
for (uint64_t i = subrange_start; i < subrange_end; ++i) {
const Status st = F(i);
if (!st.ok() && !failed) {
failed = true;
std::lock_guard<std::mutex> lock(return_st_mutex);
return_st = st;
}
}

return Status::Ok();
};
Expand Down Expand Up @@ -225,6 +239,9 @@ std::vector<Status> parallel_for(uint64_t begin, uint64_t end, const FuncT& F) {

// Wait for all instances of `execute_subrange` to complete.
tp->wait_all(tasks);

// Store the final result.
result.emplace_back(std::move(return_st));
#endif

return result;
Expand All @@ -247,10 +264,13 @@ std::vector<Status> parallel_for_2d(
uint64_t i0, uint64_t i1, uint64_t j0, uint64_t j1, const FuncT& F) {
assert(i0 <= i1);
assert(j0 <= j1);

std::vector<Status> result;

#ifdef HAVE_TBB
const uint64_t num_i_iters = i1 - i0 + 1, num_j_iters = j1 - j0 + 1;
const uint64_t num_iters = num_i_iters * num_j_iters;
std::vector<Status> result(num_iters);
result.resize(num_iters);
auto range = tbb::blocked_range2d<uint64_t>(i0, i1, j0, j1);
tbb::parallel_for(
range,
Expand All @@ -269,12 +289,17 @@ std::vector<Status> parallel_for_2d(
#else
const uint64_t range_len_i = i1 - i0;
const uint64_t range_len_j = j1 - j0;
const uint64_t size_ij = range_len_i * range_len_j;
std::vector<Status> result(size_ij);

if (size_ij == 0)
if (range_len_i == 0 || range_len_j == 0)
return result;

// The return vector will be a single Status object containing
// the first failed status that we encounter. When we remove TBB,
// we will change the interface to return a single Status object.
bool failed = false;
Status return_st = Status::Ok();
std::mutex return_st_mutex;

// Fetch the global thread pool.
std::shared_ptr<ThreadPool> tp =
global_state::GlobalState::GetGlobalState().tp();
Expand All @@ -290,15 +315,19 @@ std::vector<Status> parallel_for_2d(
// Executes subarray [begin_i, end_i) x [start_j, end_j) within the
// array [i0, i1) x [j0, j1).
std::function<Status(uint64_t, uint64_t, uint64_t, uint64_t)>
execute_subrange_ij = [i0, j0, j1, &result, &F](
execute_subrange_ij = [&failed, &return_st, &return_st_mutex, &F](
const uint64_t begin_i,
const uint64_t end_i,
const uint64_t start_j,
const uint64_t end_j) -> Status {
for (uint64_t i = begin_i; i < end_i; ++i) {
for (uint64_t j = start_j; j < end_j; ++j) {
const uint64_t idx = (i - i0) * (j1 - j0) + (j - j0);
result[idx] = F(i, j);
const Status st = F(i, j);
if (!st.ok() && !failed) {
failed = true;
std::lock_guard<std::mutex> lock(return_st_mutex);
return_st = st;
}
}
}

Expand Down Expand Up @@ -352,8 +381,12 @@ std::vector<Status> parallel_for_2d(

// Wait for all instances of `execute_subrange` to complete.
tp->wait_all(tasks);
return result;

// Store the final result.
result.emplace_back(std::move(return_st));
#endif

return result;
}

} // namespace sm
Expand Down

0 comments on commit 4878ae6

Please sign in to comment.