From 2b47384d56990dd08b3bf946bc78284360b5d1f7 Mon Sep 17 00:00:00 2001 From: joe maley Date: Fri, 7 Aug 2020 09:58:49 -0400 Subject: [PATCH] Global thread pool when TBB is disabled (#1760) This introduces a global thread pool for use when TBB is disabled. The performance has not been exhaustively benchmarked against TBB. However, I did test this on two readily available scenarios that I had been recently performance benchmarking for other reasons. One scenario makes heavy use of the parallel_sort path while the other does not. Surprisingly, disabling TBB performs about 10% quicker with this pach. // Scenario #1 TBB: 3.4s TBB disabled, this patch: 3.0s TBB disabled, on dev: 10.0s // Scenario #2 TBB: 3.1 TBB disabled, this patch: 2.7s TBB disabled, on dev: 9.1s For now, this patch uses the threadpool at the same scope as the TBB scheduler. It is a global thread pool, shared among a single process, and conditionally compiled. The concurrency level that the thread pool is configured with is determined from the "sm.num_tbb_threads" config. This patch does not disable TBB by default. Co-authored-by: Joe Maley --- HISTORY.md | 2 +- test/src/unit-capi-string_dims.cc | 15 +- test/src/unit-duplicates.cc | 21 +- tiledb/sm/c_api/tiledb.h | 9 +- tiledb/sm/cpp_api/config.h | 9 +- tiledb/sm/global_state/global_state.cc | 17 +- tiledb/sm/global_state/global_state.h | 9 +- tiledb/sm/global_state/tbb_state.cc | 90 +++++--- tiledb/sm/misc/parallel_functions.h | 294 ++++++++++++++++++++----- 9 files changed, 362 insertions(+), 104 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index de0bd8053b3..6489a13c806 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -19,7 +19,7 @@ * Split posix permissions into files and folers permissions [#1719](https://github.com/TileDB-Inc/TileDB/pull/1719) * Support seeking for CURL to allow redirects for posting to REST [#1728](https://github.com/TileDB-Inc/TileDB/pull/1728) * Changed default setting for `vfs.s3.proxy_scheme` from `https` to `http` to match common usage needs [#1759](https://github.com/TileDB-Inc/TileDB/pull/1759) - +* Enabled parallelization with native system threads when TBB is disabled [#1760](https://github.com/TileDB-Inc/TileDB/pull/1760) ## Deprecations diff --git a/test/src/unit-capi-string_dims.cc b/test/src/unit-capi-string_dims.cc index 87d179c6264..72614a45efb 100644 --- a/test/src/unit-capi-string_dims.cc +++ b/test/src/unit-capi-string_dims.cc @@ -2133,8 +2133,12 @@ TEST_CASE_METHOD( CHECK(r_d_val == "aaccccdddd"); std::vector c_d_off = {0, 2, 4, 6}; CHECK(r_d_off == c_d_off); - std::vector c_a = {1, 2, 3, 4}; - CHECK(r_a == c_a); + // The ordering of 'a' is undefined for duplicate dimension + // elements. Check both for dimension element "c". + std::vector c_a_1 = {1, 3, 2, 4}; + std::vector c_a_2 = {1, 2, 3, 4}; + const bool c_a_matches = r_a == c_a_1 || r_a == c_a_2; + CHECK(c_a_matches); // Close array rc = tiledb_array_close(ctx_, array); @@ -2245,8 +2249,11 @@ TEST_CASE_METHOD( CHECK(r_d_val == "aaccdddd"); std::vector c_d_off = {0, 2, 4}; CHECK(r_d_off == c_d_off); - std::vector c_a = {1, 2, 4}; - CHECK(r_a == c_a); + // Either value for dimension index 'cc' may be de-duped. + std::vector c_a_1 = {1, 2, 4}; + std::vector c_a_2 = {1, 3, 4}; + const bool c_a_matches = r_a == c_a_1 || r_a == c_a_2; + CHECK(c_a_matches); // Close array rc = tiledb_array_close(ctx, array); diff --git a/test/src/unit-duplicates.cc b/test/src/unit-duplicates.cc index 6737ea24dd8..67dc6c946eb 100644 --- a/test/src/unit-duplicates.cc +++ b/test/src/unit-duplicates.cc @@ -200,9 +200,13 @@ TEST_CASE_METHOD( CHECK(coords_size == coords_r_size); int coords_c[] = {1, 1, 2, 4, 5}; - int data_c[] = {1, 3, 2, 4, 5}; + // The ordering for duplicates is undefined, check all variations. + int data_c_1[] = {1, 3, 2, 4, 5}; + int data_c_2[] = {3, 1, 2, 4, 5}; CHECK(!std::memcmp(coords_c, coords_r, coords_r_size)); - CHECK(!std::memcmp(data_c, data_r, data_r_size)); + const bool data_c_matches = !std::memcmp(data_c_1, data_r, data_r_size) || + !std::memcmp(data_c_2, data_r, data_r_size); + CHECK(data_c_matches); } TEST_CASE_METHOD( @@ -324,9 +328,13 @@ TEST_CASE_METHOD( CHECK(coords_1_size + coords_2_size == coords_r_size); int coords_c[] = {1, 1, 2, 4, 5}; - int data_c[] = {1, 3, 2, 4, 5}; + // The ordering for duplicates is undefined, check all variations. + int data_c_1[] = {1, 3, 2, 4, 5}; + int data_c_2[] = {3, 1, 2, 4, 5}; CHECK(!std::memcmp(coords_c, coords_r, coords_r_size)); - CHECK(!std::memcmp(data_c, data_r, data_r_size)); + bool data_c_matches = !std::memcmp(data_c_1, data_r, data_r_size) || + !std::memcmp(data_c_2, data_r, data_r_size); + CHECK(data_c_matches); // Consolidate rc = tiledb_array_consolidate(ctx_, array_name_.c_str(), nullptr); @@ -371,5 +379,8 @@ TEST_CASE_METHOD( CHECK(coords_1_size + coords_2_size == coords_r_size); CHECK(!std::memcmp(coords_c, coords_r, coords_r_size)); - CHECK(!std::memcmp(data_c, data_r, data_r_size)); + // The ordering for duplicates is undefined, check all variations. + data_c_matches = !std::memcmp(data_c_1, data_r, data_r_size) || + !std::memcmp(data_c_2, data_r, data_r_size); + CHECK(data_c_matches); } diff --git a/tiledb/sm/c_api/tiledb.h b/tiledb/sm/c_api/tiledb.h index 118a911eca2..50c02c81a64 100644 --- a/tiledb/sm/c_api/tiledb.h +++ b/tiledb/sm/c_api/tiledb.h @@ -934,10 +934,11 @@ TILEDB_EXPORT void tiledb_config_free(tiledb_config_t** config); * parallel.
* **Default**: 1 * - `sm.num_tbb_threads`
- * The number of threads allocated for the TBB thread pool (if TBB is - * enabled). Note: this is a whole-program setting. Usually this should not - * be modified from the default. See also the documentation for TBB's - * `task_scheduler_init` class.
+ * The number of threads allocated for the TBB thread pool. Note: this + * is a whole-program setting. Usually this should not be modified from + * the default. See also the documentation for TBB's `task_scheduler_init` + * class. When TBB is disabled, this will be used to set the level of + * concurrency for generic threading where TBB is otherwise used.
* **Default**: TBB automatic * - `sm.consolidation.amplification`
* The factor by which the size of the dense fragment resulting diff --git a/tiledb/sm/cpp_api/config.h b/tiledb/sm/cpp_api/config.h index 8fb7d8f8d38..28cb05e7dd2 100644 --- a/tiledb/sm/cpp_api/config.h +++ b/tiledb/sm/cpp_api/config.h @@ -263,10 +263,11 @@ class Config { * parallel.
* **Default**: 1 * - `sm.num_tbb_threads`
- * The number of threads allocated for the TBB thread pool (if TBB is - * enabled). Note: this is a whole-program setting. Usually this should not - * be modified from the default. See also the documentation for TBB's - * `task_scheduler_init` class.
+ * The number of threads allocated for the TBB thread pool. Note: this + * is a whole-program setting. Usually this should not be modified from + * the default. See also the documentation for TBB's `task_scheduler_init` + * class. When TBB is disabled, this will be used to set the level of + * concurrency for generic threading where TBB is otherwise used.
* **Default**: TBB automatic * - `sm.consolidation.amplification`
* The factor by which the size of the dense fragment resulting diff --git a/tiledb/sm/global_state/global_state.cc b/tiledb/sm/global_state/global_state.cc index 92a8d4fe873..2e1702c34bb 100644 --- a/tiledb/sm/global_state/global_state.cc +++ b/tiledb/sm/global_state/global_state.cc @@ -40,7 +40,6 @@ #ifdef __linux__ #include "tiledb/sm/filesystem/posix.h" -#include "tiledb/sm/misc/thread_pool.h" #include "tiledb/sm/misc/utils.h" #endif @@ -50,7 +49,11 @@ namespace tiledb { namespace sm { namespace global_state { +#ifdef HAVE_TBB extern int tbb_nthreads_; +#else +extern std::shared_ptr global_tp_; +#endif GlobalState& GlobalState::GetGlobalState() { // This is thread-safe in C++11. @@ -127,7 +130,19 @@ int GlobalState::tbb_threads() { if (!initialized_) return 0; +#ifdef HAVE_TBB return tbb_nthreads_; +#else + return global_tp_->concurrency_level(); +#endif +} + +std::shared_ptr GlobalState::tp() { +#ifdef HAVE_TBB + return nullptr; +#else + return global_tp_; +#endif } } // namespace global_state diff --git a/tiledb/sm/global_state/global_state.h b/tiledb/sm/global_state/global_state.h index 03f4e857c33..60c9b6a336f 100644 --- a/tiledb/sm/global_state/global_state.h +++ b/tiledb/sm/global_state/global_state.h @@ -39,6 +39,7 @@ #include "tiledb/sm/config/config.h" #include "tiledb/sm/misc/status.h" +#include "tiledb/sm/misc/thread_pool.h" namespace tiledb { namespace sm { @@ -94,11 +95,17 @@ class GlobalState { /** * Gets the number of threads that TBB was initialized with. Returns - * 0 if TBB is disabled or the global state has not been initialized. + * 0 if the global state has not been initialized. * @return the number of configured TBB threads. */ int tbb_threads(); + /** + * Returns the global ThreadPool instance for use when TBB is disabled. + * @return The thread pool instance. + */ + std::shared_ptr tp(); + private: /** The TileDB configuration parameters. */ Config config_; diff --git a/tiledb/sm/global_state/tbb_state.cc b/tiledb/sm/global_state/tbb_state.cc index b52d41f3611..6bab7888793 100644 --- a/tiledb/sm/global_state/tbb_state.cc +++ b/tiledb/sm/global_state/tbb_state.cc @@ -33,44 +33,62 @@ #include "tiledb/sm/global_state/tbb_state.h" #include "tiledb/sm/config/config.h" +#include "tiledb/sm/misc/thread_pool.h" -#ifdef HAVE_TBB +#include +#include +#ifdef HAVE_TBB #include -#include #include #include -#include +#endif namespace tiledb { namespace sm { namespace global_state { -/** The TBB scheduler, used for controlling the number of TBB threads. */ -static std::unique_ptr tbb_scheduler_; - -/** The number of TBB threads the scheduler was configured with **/ -int tbb_nthreads_; - -Status init_tbb(const Config* config) { - int nthreads; +Status get_nthreads(const Config* const config, int* const nthreads) { if (!config) { - nthreads = std::strtol(Config::SM_NUM_TBB_THREADS.c_str(), nullptr, 10); + *nthreads = std::strtol(Config::SM_NUM_TBB_THREADS.c_str(), nullptr, 10); } else { bool found = false; - RETURN_NOT_OK(config->get("sm.num_tbb_threads", &nthreads, &found)); + RETURN_NOT_OK(config->get("sm.num_tbb_threads", nthreads, &found)); assert(found); } - if (nthreads == tbb::task_scheduler_init::automatic) { - nthreads = tbb::task_scheduler_init::default_num_threads(); +#ifdef HAVE_TBB + if (*nthreads == tbb::task_scheduler_init::automatic) { + *nthreads = tbb::task_scheduler_init::default_num_threads(); + } +#else + if (*nthreads <= 0) { + *nthreads = std::thread::hardware_concurrency(); } - if (nthreads < 1) { +#endif + + if (*nthreads < 1) { std::stringstream msg; msg << "TBB thread runtime must be initialized with >= 1 threads, got: " - << nthreads; + << *nthreads; return Status::Error(msg.str()); } + + return Status::Ok(); +} + +#ifdef HAVE_TBB + +/** The TBB scheduler, used for controlling the number of TBB threads. */ +static std::unique_ptr tbb_scheduler_; + +/** The number of TBB threads the scheduler was configured with **/ +int tbb_nthreads_; + +Status init_tbb(const Config* const config) { + int nthreads; + RETURN_NOT_OK(get_nthreads(config, &nthreads)); + if (!tbb_scheduler_) { // initialize scheduler in process for a custom number of threads (upon // first thread calling init_tbb) @@ -99,25 +117,39 @@ Status init_tbb(const Config* config) { return Status::Ok(); } -} // namespace global_state -} // namespace sm -} // namespace tiledb - #else -namespace tiledb { -namespace sm { -namespace global_state { - -int tbb_nthreads_ = 0; +/** The ThreadPool to use when TBB is disabled. */ +std::shared_ptr global_tp_; Status init_tbb(const Config* config) { - (void)config; + int nthreads; + RETURN_NOT_OK(get_nthreads(config, &nthreads)); + + if (!global_tp_) { + global_tp_ = std::make_shared(); + const Status st = global_tp_->init(nthreads); + if (!st.ok()) { + global_tp_ = nullptr; + } + RETURN_NOT_OK(st); + } else { + // If the thread pool has already been initialized, check + // invariants. + if (nthreads != static_cast(global_tp_->concurrency_level())) { + std::stringstream msg; + msg << "Global thread pool must be initialized with the same number of " + "threads: " + << nthreads << " != " << global_tp_->concurrency_level(); + return Status::Error(msg.str()); + } + } + return Status::Ok(); } +#endif + } // namespace global_state } // namespace sm } // namespace tiledb - -#endif diff --git a/tiledb/sm/misc/parallel_functions.h b/tiledb/sm/misc/parallel_functions.h index dcfe2ea70ba..eab1c60cb39 100644 --- a/tiledb/sm/misc/parallel_functions.h +++ b/tiledb/sm/misc/parallel_functions.h @@ -33,6 +33,8 @@ #ifndef TILEDB_PARALLEL_FUNCTIONS_H #define TILEDB_PARALLEL_FUNCTIONS_H +#include "tiledb/sm/global_state/global_state.h" + #include #include @@ -55,74 +57,176 @@ namespace sm { * @param end End of range to sort (exclusive). * @param cmp Comparator. */ -template -void parallel_sort(IterT begin, IterT end, CmpT cmp) { +template < + typename IterT, + typename CmpT = std::less::value_type>> +void parallel_sort(IterT begin, IterT end, const CmpT& cmp = CmpT()) { #ifdef HAVE_TBB tbb::parallel_sort(begin, end, cmp); #else - std::sort(begin, end, cmp); -#endif -} + // Sort the range using a quicksort. The algorithm is: + // 1. Pick a pivot value in the range. + // 2. Re-order the range so that all values less than the + // pivot value are ordered left of the pivot's index. + // 3. Recursively invoke step 1. + // + // To parallelize the algorithm, step #3 is modified to execute + // the recursion on the global thread pool. + std::shared_ptr tp = + global_state::GlobalState::GetGlobalState().tp(); -/** - * Sort the given iterator range, possibly in parallel. - * - * @tparam IterT Iterator type - * @param begin Beginning of range to sort (inclusive). - * @param end End of range to sort (exclusive). - */ -template -void parallel_sort(IterT begin, IterT end) { -#ifdef HAVE_TBB - tbb::parallel_sort(begin, end); -#else - std::sort(begin, end); -#endif -} + // Calculate the maximum height of the recursive call stack tree + // where each leaf node can be assigned to a single level of + // concurrency on the thread pool. The motiviation is to stop the + // recursion when all concurrency levels are utilized. When all + // concurrency levels have their own subrange to operate on, we + // will stop the quicksort and invoke std::sort() to sort the + // their subrange. + uint64_t height = 1; + uint64_t width = 1; + while (width <= tp->concurrency_level()) { + ++height; + width = 2 * width; + } -/** - * Call the given function on each element in the given iterator range, - * possibly in parallel. - * - * @tparam IterT Iterator type - * @tparam FuncT Function type (returning Status). - * @param begin Beginning of range to sort (inclusive). - * @param end End of range to sort (exclusive). - * @param F Function to call on each item - * @return Vector of Status objects, one for each function invocation. - */ -template -std::vector parallel_for_each(IterT begin, IterT end, const FuncT& F) { - auto niters = static_cast(std::distance(begin, end)); - std::vector result(niters); -#ifdef HAVE_TBB - tbb::parallel_for(uint64_t(0), niters, [begin, &result, &F](uint64_t i) { - auto it = std::next(begin, i); - result[i] = F(*it); - }); -#else - for (uint64_t i = 0; i < niters; i++) { - auto it = std::next(begin, i); - result[i] = F(*it); + // To fully utilize the all concurrency levels, increment + // the target height of the call stack tree if there would + // be idle concurrency levels. + if (width > tp->concurrency_level()) { + ++height; } + + // Define a work routine that encapsulates steps #1 - #3 in the + // algorithm. + std::function quick_sort; + quick_sort = [&](const uint64_t depth, IterT begin, IterT end) -> Status { + const size_t elements = std::distance(begin, end); + + // Stop the recursion if this subrange does not contain + // any elements to sort. + if (elements <= 1) { + return Status::Ok(); + } + + // If there are only two elements remaining, directly sort them. + if (elements <= 2) { + std::sort(begin, end, cmp); + return Status::Ok(); + } + + // If we have reached the target height of the call stack tree, + // we will defer sorting to std::sort() to finish sorting the + // subrange. This is an optimization because all concurrency + // levels in the threadpool should be utilized if the work was + // evenly distributed among them. + if (depth + 1 == height) { + std::sort(begin, end, cmp); + return Status::Ok(); + } + + // Step #1: Pick a pivot value in the range. + auto pivot_iter = begin + (elements / 2); + const auto pivot_value = *pivot_iter; + if ((end - 1) != pivot_iter) + std::iter_swap((end - 1), pivot_iter); + + // Step #2: Partition the subrange. + auto middle = begin; + for (auto iter = begin; iter != end - 1; ++iter) { + if (cmp(*iter, pivot_value)) { + std::iter_swap(middle, iter); + ++middle; + } + } + std::iter_swap(middle, (end - 1)); + + // Step #3: Recursively sort the left and right partitions. + std::vector> tasks; + if (begin != middle) { + std::function quick_sort_left = + std::bind(quick_sort, depth + 1, begin, middle); + std::future left_task = tp->execute(std::move(quick_sort_left)); + tasks.emplace_back(std::move(left_task)); + } + if (middle != end) { + std::function quick_sort_right = + std::bind(quick_sort, depth + 1, middle + 1, end); + std::future right_task = tp->execute(std::move(quick_sort_right)); + tasks.emplace_back(std::move(right_task)); + } + + // Wait for the sorted partitions. + tp->wait_all(tasks); + return Status::Ok(); + }; + + // Start the quicksort from the entire range. + quick_sort(0, begin, end); #endif - return result; } template std::vector parallel_for(uint64_t begin, uint64_t end, const FuncT& F) { assert(begin <= end); - uint64_t num_iters = end - begin + 1; - std::vector result(num_iters); + + const uint64_t range_len = end - begin; + std::vector result(range_len); + + if (range_len == 0) + return result; + #ifdef HAVE_TBB tbb::parallel_for(begin, end, [begin, &result, &F](uint64_t i) { result[i - begin] = F(i); }); #else - for (uint64_t i = begin; i < end; i++) { - result[i - begin] = F(i); + // Executes subrange [subrange_start, subrange_end) that exists + // within the range [begin, end). + std::function execute_subrange = + [begin, &result, &F]( + const uint64_t subrange_start, + const uint64_t subrange_end) -> Status { + for (uint64_t i = subrange_start; i < subrange_end; ++i) + result[i - begin] = F(i); + + return Status::Ok(); + }; + + // Fetch the global thread pool. + std::shared_ptr tp = + global_state::GlobalState::GetGlobalState().tp(); + + // Calculate the length of the subrange that each thread will + // be responsible for. + const uint64_t concurrency_level = tp->concurrency_level(); + const uint64_t subrange_len = range_len / concurrency_level; + const uint64_t subrange_len_carry = range_len % concurrency_level; + + // Execute a bound instance of `execute_subrange` for each + // subrange on the global thread pool. + uint64_t fn_iter = 0; + std::vector> tasks; + tasks.reserve(concurrency_level); + for (size_t i = 0; i < concurrency_level; ++i) { + const uint64_t task_subrange_len = + subrange_len + ((i < subrange_len_carry) ? 1 : 0); + + if (task_subrange_len == 0) + break; + + const uint64_t subrange_start = begin + fn_iter; + const uint64_t subrange_end = begin + fn_iter + task_subrange_len; + std::function bound_fn = + std::bind(execute_subrange, subrange_start, subrange_end); + tasks.emplace_back(tp->execute(std::move(bound_fn))); + + fn_iter += task_subrange_len; } + + // Wait for all instances of `execute_subrange` to complete. + tp->wait_all(tasks); #endif + return result; } @@ -143,10 +247,10 @@ std::vector parallel_for_2d( uint64_t i0, uint64_t i1, uint64_t j0, uint64_t j1, const FuncT& F) { assert(i0 <= i1); assert(j0 <= j1); +#ifdef HAVE_TBB const uint64_t num_i_iters = i1 - i0 + 1, num_j_iters = j1 - j0 + 1; const uint64_t num_iters = num_i_iters * num_j_iters; std::vector result(num_iters); -#ifdef HAVE_TBB auto range = tbb::blocked_range2d(i0, i1, j0, j1); tbb::parallel_for( range, @@ -161,15 +265,95 @@ std::vector parallel_for_2d( } } }); + return result; #else - for (uint64_t i = i0; i < i1; i++) { - for (uint64_t j = j0; j < j1; j++) { - uint64_t idx = (i - i0) * num_j_iters + (j - j0); - result[idx] = F(i, j); + const uint64_t range_len_i = i1 - i0; + const uint64_t range_len_j = j1 - j0; + const uint64_t size_ij = range_len_i * range_len_j; + std::vector result(size_ij); + + if (size_ij == 0) + return result; + + // Fetch the global thread pool. + std::shared_ptr tp = + global_state::GlobalState::GetGlobalState().tp(); + + // Calculate the length of the subrange-i and subrange-j that + // each thread will be responsible for. + const uint64_t concurrency_level = tp->concurrency_level(); + const uint64_t subrange_len_i = range_len_i / concurrency_level; + const uint64_t subrange_len_i_carry = range_len_i % concurrency_level; + const uint64_t subrange_len_j = range_len_j / concurrency_level; + const uint64_t subrange_len_j_carry = range_len_j % concurrency_level; + + // Executes subarray [begin_i, end_i) x [start_j, end_j) within the + // array [i0, i1) x [j0, j1). + std::function + execute_subrange_ij = [i0, j0, j1, &result, &F]( + const uint64_t begin_i, + const uint64_t end_i, + const uint64_t start_j, + const uint64_t end_j) -> Status { + for (uint64_t i = begin_i; i < end_i; ++i) { + for (uint64_t j = start_j; j < end_j; ++j) { + const uint64_t idx = (i - i0) * (j1 - j0) + (j - j0); + result[idx] = F(i, j); + } + } + + return Status::Ok(); + }; + + // Calculate the subranges for each dimension, i and j. + std::vector> subranges_i; + std::vector> subranges_j; + uint64_t iter_i = 0; + uint64_t iter_j = 0; + for (size_t t = 0; t < concurrency_level; ++t) { + const uint64_t task_subrange_len_i = + subrange_len_i + ((t < subrange_len_i_carry) ? 1 : 0); + const uint64_t task_subrange_len_j = + subrange_len_j + ((t < subrange_len_j_carry) ? 1 : 0); + + if (task_subrange_len_i == 0 && task_subrange_len_j == 0) + break; + + if (task_subrange_len_i > 0) { + const uint64_t begin_i = i0 + iter_i; + const uint64_t end_i = i0 + iter_i + task_subrange_len_i; + subranges_i.emplace_back(begin_i, end_i); + iter_i += task_subrange_len_i; + } + + if (task_subrange_len_j > 0) { + const uint64_t start_j = j0 + iter_j; + const uint64_t end_j = j0 + iter_j + task_subrange_len_j; + subranges_j.emplace_back(start_j, end_j); + iter_j += task_subrange_len_j; } } -#endif + + // Execute a bound instance of `execute_subrange_ij` for each + // 2D subarray on the global thread pool. + std::vector> tasks; + tasks.reserve(concurrency_level * concurrency_level); + for (const auto& subrange_i : subranges_i) { + for (const auto& subrange_j : subranges_j) { + std::function bound_fn = std::bind( + execute_subrange_ij, + subrange_i.first, + subrange_i.second, + subrange_j.first, + subrange_j.second); + tasks.emplace_back(tp->execute(std::move(bound_fn))); + } + } + + // Wait for all instances of `execute_subrange` to complete. + tp->wait_all(tasks); return result; +#endif } } // namespace sm