Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Status objects in parallel functions #1761

Merged
merged 1 commit into from
Aug 10, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 46 additions & 13 deletions tiledb/sm/misc/parallel_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,39 @@ template <typename FuncT>
std::vector<Status> parallel_for(uint64_t begin, uint64_t end, const FuncT& F) {
assert(begin <= end);

const uint64_t range_len = end - begin;
std::vector<Status> result(range_len);
std::vector<Status> result;

const uint64_t range_len = end - begin;
if (range_len == 0)
return result;

#ifdef HAVE_TBB
result.resize(range_len);
tbb::parallel_for(begin, end, [begin, &result, &F](uint64_t i) {
result[i - begin] = F(i);
});
#else
// The return vector will be a single Status object containing
// the first failed status that we encounter. When we remove TBB,
// we will change the interface to return a single Status object.
bool failed = false;
Status return_st = Status::Ok();
std::mutex return_st_mutex;

// Executes subrange [subrange_start, subrange_end) that exists
// within the range [begin, end).
std::function<Status(uint64_t, uint64_t)> execute_subrange =
[begin, &result, &F](
[&failed, &return_st, &return_st_mutex, &F](
const uint64_t subrange_start,
const uint64_t subrange_end) -> Status {
for (uint64_t i = subrange_start; i < subrange_end; ++i)
result[i - begin] = F(i);
for (uint64_t i = subrange_start; i < subrange_end; ++i) {
const Status st = F(i);
if (!st.ok() && !failed) {
failed = true;
std::lock_guard<std::mutex> lock(return_st_mutex);
return_st = st;
}
}

return Status::Ok();
};
Expand Down Expand Up @@ -225,6 +239,9 @@ std::vector<Status> parallel_for(uint64_t begin, uint64_t end, const FuncT& F) {

// Wait for all instances of `execute_subrange` to complete.
tp->wait_all(tasks);

// Store the final result.
result.emplace_back(std::move(return_st));
#endif

return result;
Expand All @@ -247,10 +264,13 @@ std::vector<Status> parallel_for_2d(
uint64_t i0, uint64_t i1, uint64_t j0, uint64_t j1, const FuncT& F) {
assert(i0 <= i1);
assert(j0 <= j1);

std::vector<Status> result;

#ifdef HAVE_TBB
const uint64_t num_i_iters = i1 - i0 + 1, num_j_iters = j1 - j0 + 1;
const uint64_t num_iters = num_i_iters * num_j_iters;
std::vector<Status> result(num_iters);
result.resize(num_iters);
auto range = tbb::blocked_range2d<uint64_t>(i0, i1, j0, j1);
tbb::parallel_for(
range,
Expand All @@ -269,12 +289,17 @@ std::vector<Status> parallel_for_2d(
#else
const uint64_t range_len_i = i1 - i0;
const uint64_t range_len_j = j1 - j0;
const uint64_t size_ij = range_len_i * range_len_j;
std::vector<Status> result(size_ij);

if (size_ij == 0)
if (range_len_i == 0 || range_len_j == 0)
return result;

// The return vector will be a single Status object containing
// the first failed status that we encounter. When we remove TBB,
// we will change the interface to return a single Status object.
bool failed = false;
Status return_st = Status::Ok();
std::mutex return_st_mutex;

// Fetch the global thread pool.
std::shared_ptr<ThreadPool> tp =
global_state::GlobalState::GetGlobalState().tp();
Expand All @@ -290,15 +315,19 @@ std::vector<Status> parallel_for_2d(
// Executes subarray [begin_i, end_i) x [start_j, end_j) within the
// array [i0, i1) x [j0, j1).
std::function<Status(uint64_t, uint64_t, uint64_t, uint64_t)>
execute_subrange_ij = [i0, j0, j1, &result, &F](
execute_subrange_ij = [&failed, &return_st, &return_st_mutex, &F](
const uint64_t begin_i,
const uint64_t end_i,
const uint64_t start_j,
const uint64_t end_j) -> Status {
for (uint64_t i = begin_i; i < end_i; ++i) {
for (uint64_t j = start_j; j < end_j; ++j) {
const uint64_t idx = (i - i0) * (j1 - j0) + (j - j0);
result[idx] = F(i, j);
const Status st = F(i, j);
if (!st.ok() && !failed) {
failed = true;
std::lock_guard<std::mutex> lock(return_st_mutex);
return_st = st;
}
}
}

Expand Down Expand Up @@ -352,8 +381,12 @@ std::vector<Status> parallel_for_2d(

// Wait for all instances of `execute_subrange` to complete.
tp->wait_all(tasks);
return result;

// Store the final result.
result.emplace_back(std::move(return_st));
#endif

return result;
}

} // namespace sm
Expand Down