Skip to content

Commit

Permalink
Global thread pool when TBB is disabled (#1760)
Browse files Browse the repository at this point in the history
This introduces a global thread pool for use when TBB is disabled. The
performance has not been exhaustively benchmarked against TBB. However, I did
test this on two readily available scenarios that I had been recently performance
benchmarking for other reasons. One scenario makes heavy use of the parallel_sort
path while the other does not. Surprisingly, disabling TBB performs about 10%
quicker with this pach.

// Scenario #1
TBB: 3.4s
TBB disabled, this patch: 3.0s
TBB disabled, on dev: 10.0s

// Scenario #2
TBB: 3.1
TBB disabled, this patch: 2.7s
TBB disabled, on dev: 9.1s

For now, this patch uses the threadpool at the same scope as the TBB scheduler.
It is a global thread pool, shared among a single process, and conditionally
compiled. The concurrency level that the thread pool is configured with is
determined from the "sm.num_tbb_threads" config.

This patch does not disable TBB by default.

Co-authored-by: Joe Maley <[email protected]>
  • Loading branch information
joe maley and Joe Maley authored Aug 7, 2020
1 parent d34929a commit 2b47384
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 104 deletions.
2 changes: 1 addition & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* Split posix permissions into files and folers permissions [#1719](https://github.com/TileDB-Inc/TileDB/pull/1719)
* Support seeking for CURL to allow redirects for posting to REST [#1728](https://github.com/TileDB-Inc/TileDB/pull/1728)
* Changed default setting for `vfs.s3.proxy_scheme` from `https` to `http` to match common usage needs [#1759](https://github.com/TileDB-Inc/TileDB/pull/1759)

* Enabled parallelization with native system threads when TBB is disabled [#1760](https://github.com/TileDB-Inc/TileDB/pull/1760)

## Deprecations

Expand Down
15 changes: 11 additions & 4 deletions test/src/unit-capi-string_dims.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2133,8 +2133,12 @@ TEST_CASE_METHOD(
CHECK(r_d_val == "aaccccdddd");
std::vector<uint64_t> c_d_off = {0, 2, 4, 6};
CHECK(r_d_off == c_d_off);
std::vector<int32_t> c_a = {1, 2, 3, 4};
CHECK(r_a == c_a);
// The ordering of 'a' is undefined for duplicate dimension
// elements. Check both for dimension element "c".
std::vector<int32_t> c_a_1 = {1, 3, 2, 4};
std::vector<int32_t> c_a_2 = {1, 2, 3, 4};
const bool c_a_matches = r_a == c_a_1 || r_a == c_a_2;
CHECK(c_a_matches);

// Close array
rc = tiledb_array_close(ctx_, array);
Expand Down Expand Up @@ -2245,8 +2249,11 @@ TEST_CASE_METHOD(
CHECK(r_d_val == "aaccdddd");
std::vector<uint64_t> c_d_off = {0, 2, 4};
CHECK(r_d_off == c_d_off);
std::vector<int32_t> c_a = {1, 2, 4};
CHECK(r_a == c_a);
// Either value for dimension index 'cc' may be de-duped.
std::vector<int32_t> c_a_1 = {1, 2, 4};
std::vector<int32_t> c_a_2 = {1, 3, 4};
const bool c_a_matches = r_a == c_a_1 || r_a == c_a_2;
CHECK(c_a_matches);

// Close array
rc = tiledb_array_close(ctx, array);
Expand Down
21 changes: 16 additions & 5 deletions test/src/unit-duplicates.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,13 @@ TEST_CASE_METHOD(
CHECK(coords_size == coords_r_size);

int coords_c[] = {1, 1, 2, 4, 5};
int data_c[] = {1, 3, 2, 4, 5};
// The ordering for duplicates is undefined, check all variations.
int data_c_1[] = {1, 3, 2, 4, 5};
int data_c_2[] = {3, 1, 2, 4, 5};
CHECK(!std::memcmp(coords_c, coords_r, coords_r_size));
CHECK(!std::memcmp(data_c, data_r, data_r_size));
const bool data_c_matches = !std::memcmp(data_c_1, data_r, data_r_size) ||
!std::memcmp(data_c_2, data_r, data_r_size);
CHECK(data_c_matches);
}

TEST_CASE_METHOD(
Expand Down Expand Up @@ -324,9 +328,13 @@ TEST_CASE_METHOD(
CHECK(coords_1_size + coords_2_size == coords_r_size);

int coords_c[] = {1, 1, 2, 4, 5};
int data_c[] = {1, 3, 2, 4, 5};
// The ordering for duplicates is undefined, check all variations.
int data_c_1[] = {1, 3, 2, 4, 5};
int data_c_2[] = {3, 1, 2, 4, 5};
CHECK(!std::memcmp(coords_c, coords_r, coords_r_size));
CHECK(!std::memcmp(data_c, data_r, data_r_size));
bool data_c_matches = !std::memcmp(data_c_1, data_r, data_r_size) ||
!std::memcmp(data_c_2, data_r, data_r_size);
CHECK(data_c_matches);

// Consolidate
rc = tiledb_array_consolidate(ctx_, array_name_.c_str(), nullptr);
Expand Down Expand Up @@ -371,5 +379,8 @@ TEST_CASE_METHOD(
CHECK(coords_1_size + coords_2_size == coords_r_size);

CHECK(!std::memcmp(coords_c, coords_r, coords_r_size));
CHECK(!std::memcmp(data_c, data_r, data_r_size));
// The ordering for duplicates is undefined, check all variations.
data_c_matches = !std::memcmp(data_c_1, data_r, data_r_size) ||
!std::memcmp(data_c_2, data_r, data_r_size);
CHECK(data_c_matches);
}
9 changes: 5 additions & 4 deletions tiledb/sm/c_api/tiledb.h
Original file line number Diff line number Diff line change
Expand Up @@ -934,10 +934,11 @@ TILEDB_EXPORT void tiledb_config_free(tiledb_config_t** config);
* parallel.<br>
* **Default**: 1
* - `sm.num_tbb_threads` <br>
* The number of threads allocated for the TBB thread pool (if TBB is
* enabled). Note: this is a whole-program setting. Usually this should not
* be modified from the default. See also the documentation for TBB's
* `task_scheduler_init` class.<br>
* The number of threads allocated for the TBB thread pool. Note: this
* is a whole-program setting. Usually this should not be modified from
* the default. See also the documentation for TBB's `task_scheduler_init`
* class. When TBB is disabled, this will be used to set the level of
* concurrency for generic threading where TBB is otherwise used. <br>
* **Default**: TBB automatic
* - `sm.consolidation.amplification` <br>
* The factor by which the size of the dense fragment resulting
Expand Down
9 changes: 5 additions & 4 deletions tiledb/sm/cpp_api/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,11 @@ class Config {
* parallel.<br>
* **Default**: 1
* - `sm.num_tbb_threads` <br>
* The number of threads allocated for the TBB thread pool (if TBB is
* enabled). Note: this is a whole-program setting. Usually this should not
* be modified from the default. See also the documentation for TBB's
* `task_scheduler_init` class.<br>
* The number of threads allocated for the TBB thread pool. Note: this
* is a whole-program setting. Usually this should not be modified from
* the default. See also the documentation for TBB's `task_scheduler_init`
* class. When TBB is disabled, this will be used to set the level of
* concurrency for generic threading where TBB is otherwise used. <br>
* **Default**: TBB automatic
* - `sm.consolidation.amplification` <br>
* The factor by which the size of the dense fragment resulting
Expand Down
17 changes: 16 additions & 1 deletion tiledb/sm/global_state/global_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

#ifdef __linux__
#include "tiledb/sm/filesystem/posix.h"
#include "tiledb/sm/misc/thread_pool.h"
#include "tiledb/sm/misc/utils.h"
#endif

Expand All @@ -50,7 +49,11 @@ namespace tiledb {
namespace sm {
namespace global_state {

#ifdef HAVE_TBB
extern int tbb_nthreads_;
#else
extern std::shared_ptr<ThreadPool> global_tp_;
#endif

GlobalState& GlobalState::GetGlobalState() {
// This is thread-safe in C++11.
Expand Down Expand Up @@ -127,7 +130,19 @@ int GlobalState::tbb_threads() {
if (!initialized_)
return 0;

#ifdef HAVE_TBB
return tbb_nthreads_;
#else
return global_tp_->concurrency_level();
#endif
}

std::shared_ptr<ThreadPool> GlobalState::tp() {
#ifdef HAVE_TBB
return nullptr;
#else
return global_tp_;
#endif
}

} // namespace global_state
Expand Down
9 changes: 8 additions & 1 deletion tiledb/sm/global_state/global_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include "tiledb/sm/config/config.h"
#include "tiledb/sm/misc/status.h"
#include "tiledb/sm/misc/thread_pool.h"

namespace tiledb {
namespace sm {
Expand Down Expand Up @@ -94,11 +95,17 @@ class GlobalState {

/**
* Gets the number of threads that TBB was initialized with. Returns
* 0 if TBB is disabled or the global state has not been initialized.
* 0 if the global state has not been initialized.
* @return the number of configured TBB threads.
*/
int tbb_threads();

/**
* Returns the global ThreadPool instance for use when TBB is disabled.
* @return The thread pool instance.
*/
std::shared_ptr<ThreadPool> tp();

private:
/** The TileDB configuration parameters. */
Config config_;
Expand Down
90 changes: 61 additions & 29 deletions tiledb/sm/global_state/tbb_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,62 @@

#include "tiledb/sm/global_state/tbb_state.h"
#include "tiledb/sm/config/config.h"
#include "tiledb/sm/misc/thread_pool.h"

#ifdef HAVE_TBB
#include <cassert>
#include <sstream>

#ifdef HAVE_TBB
#include <tbb/task_scheduler_init.h>
#include <cassert>
#include <cstdlib>
#include <memory>
#include <sstream>
#endif

namespace tiledb {
namespace sm {
namespace global_state {

/** The TBB scheduler, used for controlling the number of TBB threads. */
static std::unique_ptr<tbb::task_scheduler_init> tbb_scheduler_;

/** The number of TBB threads the scheduler was configured with **/
int tbb_nthreads_;

Status init_tbb(const Config* config) {
int nthreads;
Status get_nthreads(const Config* const config, int* const nthreads) {
if (!config) {
nthreads = std::strtol(Config::SM_NUM_TBB_THREADS.c_str(), nullptr, 10);
*nthreads = std::strtol(Config::SM_NUM_TBB_THREADS.c_str(), nullptr, 10);
} else {
bool found = false;
RETURN_NOT_OK(config->get<int>("sm.num_tbb_threads", &nthreads, &found));
RETURN_NOT_OK(config->get<int>("sm.num_tbb_threads", nthreads, &found));
assert(found);
}

if (nthreads == tbb::task_scheduler_init::automatic) {
nthreads = tbb::task_scheduler_init::default_num_threads();
#ifdef HAVE_TBB
if (*nthreads == tbb::task_scheduler_init::automatic) {
*nthreads = tbb::task_scheduler_init::default_num_threads();
}
#else
if (*nthreads <= 0) {
*nthreads = std::thread::hardware_concurrency();
}
if (nthreads < 1) {
#endif

if (*nthreads < 1) {
std::stringstream msg;
msg << "TBB thread runtime must be initialized with >= 1 threads, got: "
<< nthreads;
<< *nthreads;
return Status::Error(msg.str());
}

return Status::Ok();
}

#ifdef HAVE_TBB

/** The TBB scheduler, used for controlling the number of TBB threads. */
static std::unique_ptr<tbb::task_scheduler_init> tbb_scheduler_;

/** The number of TBB threads the scheduler was configured with **/
int tbb_nthreads_;

Status init_tbb(const Config* const config) {
int nthreads;
RETURN_NOT_OK(get_nthreads(config, &nthreads));

if (!tbb_scheduler_) {
// initialize scheduler in process for a custom number of threads (upon
// first thread calling init_tbb)
Expand Down Expand Up @@ -99,25 +117,39 @@ Status init_tbb(const Config* config) {
return Status::Ok();
}

} // namespace global_state
} // namespace sm
} // namespace tiledb

#else

namespace tiledb {
namespace sm {
namespace global_state {

int tbb_nthreads_ = 0;
/** The ThreadPool to use when TBB is disabled. */
std::shared_ptr<ThreadPool> global_tp_;

Status init_tbb(const Config* config) {
(void)config;
int nthreads;
RETURN_NOT_OK(get_nthreads(config, &nthreads));

if (!global_tp_) {
global_tp_ = std::make_shared<ThreadPool>();
const Status st = global_tp_->init(nthreads);
if (!st.ok()) {
global_tp_ = nullptr;
}
RETURN_NOT_OK(st);
} else {
// If the thread pool has already been initialized, check
// invariants.
if (nthreads != static_cast<int>(global_tp_->concurrency_level())) {
std::stringstream msg;
msg << "Global thread pool must be initialized with the same number of "
"threads: "
<< nthreads << " != " << global_tp_->concurrency_level();
return Status::Error(msg.str());
}
}

return Status::Ok();
}

#endif

} // namespace global_state
} // namespace sm
} // namespace tiledb

#endif
Loading

0 comments on commit 2b47384

Please sign in to comment.