From 4878ae69f077a1345ca0a7589e2d663802238e37 Mon Sep 17 00:00:00 2001 From: joe maley Date: Mon, 10 Aug 2020 15:41:10 -0400 Subject: [PATCH] Reduce Status objects in parallel functions (#1761) Currently, we return a vector in the parallel_for/parallel_for_2d routines. The return vector contains a status for each invocation of the function. This can be 100,000+ objects. This patch modifies the routines to only return the first failed status. Co-authored-by: Joe Maley --- tiledb/sm/misc/parallel_functions.h | 59 ++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/tiledb/sm/misc/parallel_functions.h b/tiledb/sm/misc/parallel_functions.h index eab1c60cb39..823849fc100 100644 --- a/tiledb/sm/misc/parallel_functions.h +++ b/tiledb/sm/misc/parallel_functions.h @@ -169,25 +169,39 @@ template std::vector parallel_for(uint64_t begin, uint64_t end, const FuncT& F) { assert(begin <= end); - const uint64_t range_len = end - begin; - std::vector result(range_len); + std::vector result; + const uint64_t range_len = end - begin; if (range_len == 0) return result; #ifdef HAVE_TBB + result.resize(range_len); tbb::parallel_for(begin, end, [begin, &result, &F](uint64_t i) { result[i - begin] = F(i); }); #else + // The return vector will be a single Status object containing + // the first failed status that we encounter. When we remove TBB, + // we will change the interface to return a single Status object. + bool failed = false; + Status return_st = Status::Ok(); + std::mutex return_st_mutex; + // Executes subrange [subrange_start, subrange_end) that exists // within the range [begin, end). std::function execute_subrange = - [begin, &result, &F]( + [&failed, &return_st, &return_st_mutex, &F]( const uint64_t subrange_start, const uint64_t subrange_end) -> Status { - for (uint64_t i = subrange_start; i < subrange_end; ++i) - result[i - begin] = F(i); + for (uint64_t i = subrange_start; i < subrange_end; ++i) { + const Status st = F(i); + if (!st.ok() && !failed) { + failed = true; + std::lock_guard lock(return_st_mutex); + return_st = st; + } + } return Status::Ok(); }; @@ -225,6 +239,9 @@ std::vector parallel_for(uint64_t begin, uint64_t end, const FuncT& F) { // Wait for all instances of `execute_subrange` to complete. tp->wait_all(tasks); + + // Store the final result. + result.emplace_back(std::move(return_st)); #endif return result; @@ -247,10 +264,13 @@ std::vector parallel_for_2d( uint64_t i0, uint64_t i1, uint64_t j0, uint64_t j1, const FuncT& F) { assert(i0 <= i1); assert(j0 <= j1); + + std::vector result; + #ifdef HAVE_TBB const uint64_t num_i_iters = i1 - i0 + 1, num_j_iters = j1 - j0 + 1; const uint64_t num_iters = num_i_iters * num_j_iters; - std::vector result(num_iters); + result.resize(num_iters); auto range = tbb::blocked_range2d(i0, i1, j0, j1); tbb::parallel_for( range, @@ -269,12 +289,17 @@ std::vector parallel_for_2d( #else const uint64_t range_len_i = i1 - i0; const uint64_t range_len_j = j1 - j0; - const uint64_t size_ij = range_len_i * range_len_j; - std::vector result(size_ij); - if (size_ij == 0) + if (range_len_i == 0 || range_len_j == 0) return result; + // The return vector will be a single Status object containing + // the first failed status that we encounter. When we remove TBB, + // we will change the interface to return a single Status object. + bool failed = false; + Status return_st = Status::Ok(); + std::mutex return_st_mutex; + // Fetch the global thread pool. std::shared_ptr tp = global_state::GlobalState::GetGlobalState().tp(); @@ -290,15 +315,19 @@ std::vector parallel_for_2d( // Executes subarray [begin_i, end_i) x [start_j, end_j) within the // array [i0, i1) x [j0, j1). std::function - execute_subrange_ij = [i0, j0, j1, &result, &F]( + execute_subrange_ij = [&failed, &return_st, &return_st_mutex, &F]( const uint64_t begin_i, const uint64_t end_i, const uint64_t start_j, const uint64_t end_j) -> Status { for (uint64_t i = begin_i; i < end_i; ++i) { for (uint64_t j = start_j; j < end_j; ++j) { - const uint64_t idx = (i - i0) * (j1 - j0) + (j - j0); - result[idx] = F(i, j); + const Status st = F(i, j); + if (!st.ok() && !failed) { + failed = true; + std::lock_guard lock(return_st_mutex); + return_st = st; + } } } @@ -352,8 +381,12 @@ std::vector parallel_for_2d( // Wait for all instances of `execute_subrange` to complete. tp->wait_all(tasks); - return result; + + // Store the final result. + result.emplace_back(std::move(return_st)); #endif + + return result; } } // namespace sm