Skip to content

Commit

Permalink
Remove getStats() API from ThreadManager
Browse files Browse the repository at this point in the history
Summary:
ThreadManager is exposing `getStats()` API. And the data it computes  is not used anywhere.
To add insult to the injury , ThriftSever was enabling the collection of these stats by default,  and there is a spin lock involved to collect the stats (see reportTaskStats implementation) which can cause significant CPU costs.  The `getStats` API computes average running and execution time for tasks. But same functionality is currently available using the TM observer as well. Due to these reason, we should reddiff the functionality.

Reviewed By: yfeldblum

Differential Revision: D28488094

fbshipit-source-id: f53a73249b978c993884e6c3802b5d053d3de6fe
  • Loading branch information
mshneer authored and facebook-github-bot committed Jun 29, 2021
1 parent 5976312 commit bfb527c
Show file tree
Hide file tree
Showing 13 changed files with 51 additions and 81 deletions.
64 changes: 25 additions & 39 deletions thrift/lib/cpp/concurrency/ThreadManager.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -216,14 +216,13 @@ class ThreadManager::Impl : public ThreadManager,
class Worker; class Worker;


public: public:
explicit Impl(bool enableTaskStats = false, size_t numPriorities = 1) explicit Impl(size_t numPriorities = 1)
: workerCount_(0), : workerCount_(0),
intendedWorkerCount_(0), intendedWorkerCount_(0),
idleCount_(0), idleCount_(0),
totalTaskCount_(0), totalTaskCount_(0),
expiredCount_(0), expiredCount_(0),
workersToStop_(0), workersToStop_(0),
enableTaskStats_(enableTaskStats),
state_(ThreadManager::UNINITIALIZED), state_(ThreadManager::UNINITIALIZED),
tasks_(N_SOURCES * numPriorities), tasks_(N_SOURCES * numPriorities),
deadWorkers_(), deadWorkers_(),
Expand Down Expand Up @@ -372,8 +371,6 @@ class ThreadManager::Impl : public ThreadManager,
size_t expiredCount_; size_t expiredCount_;
std::atomic<int> workersToStop_; std::atomic<int> workersToStop_;


const bool enableTaskStats_;

ExpireCallback expireCallback_; ExpireCallback expireCallback_;
ExpireCallback codelCallback_; ExpireCallback codelCallback_;
InitCallback initCallback_; InitCallback initCallback_;
Expand Down Expand Up @@ -411,9 +408,8 @@ namespace {


class SimpleThreadManagerImpl : public ThreadManager::Impl { class SimpleThreadManagerImpl : public ThreadManager::Impl {
public: public:
explicit SimpleThreadManagerImpl( explicit SimpleThreadManagerImpl(size_t workerCount = 4)
size_t workerCount = 4, bool enableTaskStats = false) : ThreadManager::Impl(), workerCount_(workerCount) {
: ThreadManager::Impl(enableTaskStats), workerCount_(workerCount) {
executors_.reserve(N_SOURCES); executors_.reserve(N_SOURCES);
// for INTERNAL source, this is just a straight pass through // for INTERNAL source, this is just a straight pass through
executors_.emplace_back(this).get_deleter().unown(); executors_.emplace_back(this).get_deleter().unown();
Expand Down Expand Up @@ -454,10 +450,8 @@ class SimpleThreadManagerImpl : public ThreadManager::Impl {


} // namespace } // namespace


SimpleThreadManager::SimpleThreadManager( SimpleThreadManager::SimpleThreadManager(size_t workerCount)
size_t workerCount, bool enableTaskStats) : impl_(std::make_unique<SimpleThreadManagerImpl>(workerCount)) {}
: impl_(std::make_unique<SimpleThreadManagerImpl>(
workerCount, enableTaskStats)) {}


SimpleThreadManager::~SimpleThreadManager() { SimpleThreadManager::~SimpleThreadManager() {
joinKeepAliveOnce(); joinKeepAliveOnce();
Expand Down Expand Up @@ -547,10 +541,12 @@ void SimpleThreadManager::setCodelCallback(ExpireCallback cb) {
void SimpleThreadManager::setThreadInitCallback(InitCallback cb) { void SimpleThreadManager::setThreadInitCallback(InitCallback cb) {
return impl_->setThreadInitCallback(std::move(cb)); return impl_->setThreadInitCallback(std::move(cb));
} }

void SimpleThreadManager::addTaskObserver( void SimpleThreadManager::addTaskObserver(
std::shared_ptr<ThreadManager::Observer> observer) { std::shared_ptr<ThreadManager::Observer> observer) {
impl_->addTaskObserver(std::move(observer)); impl_->addTaskObserver(std::move(observer));
} }

folly::Executor::KeepAlive<> SimpleThreadManager::getKeepAlive( folly::Executor::KeepAlive<> SimpleThreadManager::getKeepAlive(
ExecutionScope es, Source source) const { ExecutionScope es, Source source) const {
return impl_->getKeepAlive(std::move(es), source); return impl_->getKeepAlive(std::move(es), source);
Expand Down Expand Up @@ -1016,14 +1012,12 @@ class PriorityThreadManager::PriorityImpl
: public PriorityThreadManager, : public PriorityThreadManager,
public folly::DefaultKeepAliveExecutor { public folly::DefaultKeepAliveExecutor {
public: public:
explicit PriorityImpl( explicit PriorityImpl(const std::array<
const std::array< std::pair<std::shared_ptr<ThreadFactory>, size_t>,
std::pair<std::shared_ptr<ThreadFactory>, size_t>, N_PRIORITIES>& factories) {
N_PRIORITIES>& factories,
bool enableTaskStats = false) {
executors_.reserve(N_PRIORITIES * N_SOURCES); executors_.reserve(N_PRIORITIES * N_SOURCES);
for (int i = 0; i < N_PRIORITIES; i++) { for (int i = 0; i < N_PRIORITIES; i++) {
auto m = std::make_unique<ThreadManager::Impl>(enableTaskStats); auto m = std::make_unique<ThreadManager::Impl>();
// for INTERNAL source, this is just a straight pass through // for INTERNAL source, this is just a straight pass through
executors_.emplace_back(m.get()).get_deleter().unown(); executors_.emplace_back(m.get()).get_deleter().unown();
for (int j = 1; j < N_SOURCES; j++) { for (int j = 1; j < N_SOURCES; j++) {
Expand Down Expand Up @@ -1286,10 +1280,8 @@ namespace {
class PriorityQueueThreadManager : public ThreadManager::Impl { class PriorityQueueThreadManager : public ThreadManager::Impl {
public: public:
typedef apache::thrift::concurrency::PRIORITY PRIORITY; typedef apache::thrift::concurrency::PRIORITY PRIORITY;
explicit PriorityQueueThreadManager( explicit PriorityQueueThreadManager(size_t numThreads)
size_t numThreads, bool enableTaskStats = false) : ThreadManager::Impl(N_PRIORITIES), numThreads_(numThreads) {
: ThreadManager::Impl(enableTaskStats, N_PRIORITIES),
numThreads_(numThreads) {
for (int i = 0; i < N_PRIORITIES; i++) { for (int i = 0; i < N_PRIORITIES; i++) {
for (int j = 0; j < N_SOURCES; j++) { for (int j = 0; j < N_SOURCES; j++) {
executors_.emplace_back( executors_.emplace_back(
Expand Down Expand Up @@ -1577,24 +1569,22 @@ void ThreadManager::Impl::addTaskObserver(
} }


std::shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager( std::shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(
size_t count, bool enableTaskStats) { size_t count) {
auto tm = std::make_shared<SimpleThreadManagerImpl>(count, enableTaskStats); auto tm = std::make_shared<SimpleThreadManagerImpl>(count);
tm->threadFactory(Factory(PosixThreadFactory::NORMAL_PRI)); tm->threadFactory(Factory(PosixThreadFactory::NORMAL_PRI));
return tm; return tm;
} }


std::shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager( std::shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(
const std::string& name, size_t count, bool enableTaskStats) { const std::string& name, size_t count) {
auto simpleThreadManager = auto simpleThreadManager = std::make_shared<SimpleThreadManagerImpl>(count);
std::make_shared<SimpleThreadManagerImpl>(count, enableTaskStats);
simpleThreadManager->setNamePrefix(name); simpleThreadManager->setNamePrefix(name);
return simpleThreadManager; return simpleThreadManager;
} }


std::shared_ptr<ThreadManager> ThreadManager::newPriorityQueueThreadManager( std::shared_ptr<ThreadManager> ThreadManager::newPriorityQueueThreadManager(
size_t numThreads, bool enableTaskStats) { size_t numThreads) {
auto tm = auto tm = std::make_shared<PriorityQueueThreadManager>(numThreads);
std::make_shared<PriorityQueueThreadManager>(numThreads, enableTaskStats);
tm->threadFactory(Factory(PosixThreadFactory::NORMAL_PRI)); tm->threadFactory(Factory(PosixThreadFactory::NORMAL_PRI));
return tm; return tm;
} }
Expand All @@ -1603,21 +1593,19 @@ std::shared_ptr<PriorityThreadManager>
PriorityThreadManager::newPriorityThreadManager( PriorityThreadManager::newPriorityThreadManager(
const std::array< const std::array<
std::pair<std::shared_ptr<ThreadFactory>, size_t>, std::pair<std::shared_ptr<ThreadFactory>, size_t>,
N_PRIORITIES>& factories, N_PRIORITIES>& factories) {
bool enableTaskStats) {
auto copy = factories; auto copy = factories;
if (copy[PRIORITY::NORMAL].second < NORMAL_PRIORITY_MINIMUM_THREADS) { if (copy[PRIORITY::NORMAL].second < NORMAL_PRIORITY_MINIMUM_THREADS) {
LOG(INFO) << "Creating minimum threads of NORMAL priority: " LOG(INFO) << "Creating minimum threads of NORMAL priority: "
<< NORMAL_PRIORITY_MINIMUM_THREADS; << NORMAL_PRIORITY_MINIMUM_THREADS;
copy[PRIORITY::NORMAL].second = NORMAL_PRIORITY_MINIMUM_THREADS; copy[PRIORITY::NORMAL].second = NORMAL_PRIORITY_MINIMUM_THREADS;
} }
return std::make_shared<PriorityThreadManager::PriorityImpl>( return std::make_shared<PriorityThreadManager::PriorityImpl>(copy);
copy, enableTaskStats);
} }


std::shared_ptr<PriorityThreadManager> std::shared_ptr<PriorityThreadManager>
PriorityThreadManager::newPriorityThreadManager( PriorityThreadManager::newPriorityThreadManager(
const std::array<size_t, N_PRIORITIES>& counts, bool enableTaskStats) { const std::array<size_t, N_PRIORITIES>& counts) {
static_assert(N_PRIORITIES == 5, "Implementation is out-of-date"); static_assert(N_PRIORITIES == 5, "Implementation is out-of-date");
// Note that priorities for HIGH and IMPORTANT are the same, the difference // Note that priorities for HIGH and IMPORTANT are the same, the difference
// is in the number of threads. // is in the number of threads.
Expand All @@ -1632,14 +1620,12 @@ PriorityThreadManager::newPriorityThreadManager(
{Factory(PosixThreadFactory::NORMAL_PRI), counts[PRIORITY::NORMAL]}, {Factory(PosixThreadFactory::NORMAL_PRI), counts[PRIORITY::NORMAL]},
{Factory(PosixThreadFactory::LOW_PRI), counts[PRIORITY::BEST_EFFORT]}, {Factory(PosixThreadFactory::LOW_PRI), counts[PRIORITY::BEST_EFFORT]},
}}; }};
return newPriorityThreadManager(factories, enableTaskStats); return newPriorityThreadManager(factories);
} }


std::shared_ptr<PriorityThreadManager> std::shared_ptr<PriorityThreadManager>
PriorityThreadManager::newPriorityThreadManager( PriorityThreadManager::newPriorityThreadManager(size_t normalThreadsCount) {
size_t normalThreadsCount, bool enableTaskStats) { return newPriorityThreadManager({{2, 2, 2, normalThreadsCount, 2}});
return newPriorityThreadManager(
{{2, 2, 2, normalThreadsCount, 2}}, enableTaskStats);
} }


} // namespace concurrency } // namespace concurrency
Expand Down
18 changes: 7 additions & 11 deletions thrift/lib/cpp/concurrency/ThreadManager.h
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -234,22 +234,22 @@ class ThreadManager : public virtual folly::Executor {
* Creates a simple thread manager that uses count number of worker threads * Creates a simple thread manager that uses count number of worker threads
*/ */
static std::shared_ptr<ThreadManager> newSimpleThreadManager( static std::shared_ptr<ThreadManager> newSimpleThreadManager(
size_t count = 4, bool enableTaskStats = false); size_t count = 4);


/** /**
* Creates a simple thread manager that uses count number of worker threads * Creates a simple thread manager that uses count number of worker threads
* and sets the name prefix * and sets the name prefix
*/ */
static std::shared_ptr<ThreadManager> newSimpleThreadManager( static std::shared_ptr<ThreadManager> newSimpleThreadManager(
const std::string& name, size_t count = 4, bool enableTaskStats = false); const std::string& name, size_t count = 4);


/** /**
* Creates a thread manager with support for priorities. Unlike * Creates a thread manager with support for priorities. Unlike
* PriorityThreadManager, requests are still served from a single * PriorityThreadManager, requests are still served from a single
* thread pool. * thread pool.
*/ */
static std::shared_ptr<ThreadManager> newPriorityQueueThreadManager( static std::shared_ptr<ThreadManager> newPriorityQueueThreadManager(
size_t numThreads, bool enableTaskStats = false); size_t numThreads);


struct RunStats { struct RunStats {
const std::string& threadPoolName; const std::string& threadPoolName;
Expand Down Expand Up @@ -366,16 +366,14 @@ class PriorityThreadManager : public ThreadManager {
static std::shared_ptr<PriorityThreadManager> newPriorityThreadManager( static std::shared_ptr<PriorityThreadManager> newPriorityThreadManager(
const std::array< const std::array<
std::pair<std::shared_ptr<ThreadFactory>, size_t>, std::pair<std::shared_ptr<ThreadFactory>, size_t>,
N_PRIORITIES>& counts, N_PRIORITIES>& counts);
bool enableTaskStats = false);


/** /**
* Creates a priority-aware thread manager that uses counts[X] * Creates a priority-aware thread manager that uses counts[X]
* worker threads for priority X. * worker threads for priority X.
*/ */
static std::shared_ptr<PriorityThreadManager> newPriorityThreadManager( static std::shared_ptr<PriorityThreadManager> newPriorityThreadManager(
const std::array<size_t, N_PRIORITIES>& counts, const std::array<size_t, N_PRIORITIES>& counts);
bool enableTaskStats = false);


/** /**
* Creates a priority-aware thread manager that uses normalThreadsCount * Creates a priority-aware thread manager that uses normalThreadsCount
Expand All @@ -387,8 +385,7 @@ class PriorityThreadManager : public ThreadManager {
* to the number of CPUs on the system * to the number of CPUs on the system
*/ */
static std::shared_ptr<PriorityThreadManager> newPriorityThreadManager( static std::shared_ptr<PriorityThreadManager> newPriorityThreadManager(
size_t normalThreadsCount = sysconf(_SC_NPROCESSORS_ONLN), size_t normalThreadsCount = sysconf(_SC_NPROCESSORS_ONLN));
bool enableTaskStats = false);


class PriorityImpl; class PriorityImpl;
}; };
Expand Down Expand Up @@ -462,8 +459,7 @@ class ThreadManagerExecutorAdapter : public ThreadManager,
class SimpleThreadManager : public ThreadManager, class SimpleThreadManager : public ThreadManager,
public folly::DefaultKeepAliveExecutor { public folly::DefaultKeepAliveExecutor {
public: public:
explicit SimpleThreadManager( explicit SimpleThreadManager(size_t workerCount = 4);
size_t workerCount = 4, bool enableTaskStats = false);
~SimpleThreadManager() override; ~SimpleThreadManager() override;


void start() override; void start() override;
Expand Down
10 changes: 4 additions & 6 deletions thrift/lib/cpp/concurrency/test/ThreadManagerTests.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -784,8 +784,7 @@ TEST_F(ThreadManagerTest, PriorityThreadManagerWorkerCount) {
} }


TEST_F(ThreadManagerTest, PriorityQueueThreadManagerExecutor) { TEST_F(ThreadManagerTest, PriorityQueueThreadManagerExecutor) {
auto threadManager = auto threadManager = ThreadManager::newPriorityQueueThreadManager(1);
ThreadManager::newPriorityQueueThreadManager(1, true /*stats*/);
threadManager->start(); threadManager->start();
folly::Baton<> reqSyncBaton; folly::Baton<> reqSyncBaton;
folly::Baton<> reqDoneBaton; folly::Baton<> reqDoneBaton;
Expand Down Expand Up @@ -815,10 +814,9 @@ TEST_F(ThreadManagerTest, PriorityQueueThreadManagerExecutor) {
std::array<std::function<std::shared_ptr<ThreadManager>()>, 3> factories = { std::array<std::function<std::shared_ptr<ThreadManager>()>, 3> factories = {
std::bind( std::bind(
(std::shared_ptr<ThreadManager>(*)( (std::shared_ptr<ThreadManager>(*)(
size_t, bool))ThreadManager::newSimpleThreadManager, size_t))ThreadManager::newSimpleThreadManager,
1, 1),
false), std::bind(ThreadManager::newPriorityQueueThreadManager, 1),
std::bind(ThreadManager::newPriorityQueueThreadManager, 1, false),
[]() -> std::shared_ptr<apache::thrift::concurrency::ThreadManager> { []() -> std::shared_ptr<apache::thrift::concurrency::ThreadManager> {
return PriorityThreadManager::newPriorityThreadManager({{ return PriorityThreadManager::newPriorityThreadManager({{
1 /*HIGH_IMPORTANT*/, 1 /*HIGH_IMPORTANT*/,
Expand Down
2 changes: 1 addition & 1 deletion thrift/lib/cpp2/test/CoroutineTest.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ TEST(CoroutineExceptionTest, completesHandlerCallback) {
CoroutineServiceHandlerThrowing handler; CoroutineServiceHandlerThrowing handler;


folly::ScopedEventBaseThread ebt; folly::ScopedEventBaseThread ebt;
auto tm = ThreadManager::newSimpleThreadManager(1, true); auto tm = ThreadManager::newSimpleThreadManager(1);


apache::thrift::Cpp2RequestContext cpp2reqCtx(nullptr); apache::thrift::Cpp2RequestContext cpp2reqCtx(nullptr);
auto cb = std::make_unique< auto cb = std::make_unique<
Expand Down
2 changes: 1 addition & 1 deletion thrift/lib/cpp2/test/FutureTest.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ TEST(ThriftServer, FutureGetOrderTest) {
using std::chrono::steady_clock; using std::chrono::steady_clock;


auto thf = std::make_shared<PosixThreadFactory>(); auto thf = std::make_shared<PosixThreadFactory>();
auto thm = ThreadManager::newSimpleThreadManager(1, false); auto thm = ThreadManager::newSimpleThreadManager(1);
thm->threadFactory(thf); thm->threadFactory(thf);
thm->start(); thm->start();
apache::thrift::TestThriftServerFactory<TestInterface> factory; apache::thrift::TestThriftServerFactory<TestInterface> factory;
Expand Down
2 changes: 1 addition & 1 deletion thrift/lib/cpp2/test/HTTPClientChannelTest.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ std::unique_ptr<HTTP2RoutingHandler> createHTTP2RoutingHandler(


std::shared_ptr<BaseThriftServer> createHttpServer() { std::shared_ptr<BaseThriftServer> createHttpServer() {
auto handler = std::make_shared<TestServiceHandler>(); auto handler = std::make_shared<TestServiceHandler>();
auto tm = ThreadManager::newSimpleThreadManager(1, false); auto tm = ThreadManager::newSimpleThreadManager(1);
tm->threadFactory(std::make_shared<PosixThreadFactory>()); tm->threadFactory(std::make_shared<PosixThreadFactory>());
tm->start(); tm->start();
auto server = std::make_shared<ThriftServer>(); auto server = std::make_shared<ThriftServer>();
Expand Down
2 changes: 1 addition & 1 deletion thrift/lib/cpp2/test/InstrumentationTest.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class DebuggingFrameHandler : public rocket::SetupFrameHandler {
reqRegistry_([] { return new RequestsRegistry(0, 0, 0); }) { reqRegistry_([] { return new RequestsRegistry(0, 0, 0); }) {
auto tf = auto tf =
std::make_shared<PosixThreadFactory>(PosixThreadFactory::ATTACHED); std::make_shared<PosixThreadFactory>(PosixThreadFactory::ATTACHED);
tm_ = std::make_shared<SimpleThreadManager>(1, false); tm_ = std::make_shared<SimpleThreadManager>(1);
tm_->setNamePrefix("DebugInterface"); tm_->setNamePrefix("DebugInterface");
tm_->threadFactory(move(tf)); tm_->threadFactory(move(tf));
tm_->start(); tm_->start();
Expand Down
2 changes: 1 addition & 1 deletion thrift/lib/cpp2/test/server/ThriftServerTest.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -2627,7 +2627,7 @@ TEST_P(HeaderOrRocket, OnStartStopServingTest) {
auto tf = std::make_shared<PosixThreadFactory>( auto tf = std::make_shared<PosixThreadFactory>(
PosixThreadFactory::ATTACHED); PosixThreadFactory::ATTACHED);
// We need at least 2 threads for the test // We need at least 2 threads for the test
auto tm = ThreadManager::newSimpleThreadManager(2, false); auto tm = ThreadManager::newSimpleThreadManager(2);
tm->threadFactory(move(tf)); tm->threadFactory(move(tf));
tm->start(); tm->start();
ts.setThreadManager(tm); ts.setThreadManager(tm);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -516,8 +516,7 @@ TYPED_TEST(ScopedServerInterfaceThreadTest, joinRequestsRestartServer) {
auto tf = make_shared<apache::thrift::concurrency::PosixThreadFactory>( auto tf = make_shared<apache::thrift::concurrency::PosixThreadFactory>(
apache::thrift::concurrency::PosixThreadFactory::ATTACHED); apache::thrift::concurrency::PosixThreadFactory::ATTACHED);
auto tm = auto tm =
apache::thrift::concurrency::ThreadManager::newSimpleThreadManager( apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(1);
1, false);
tm->threadFactory(move(tf)); tm->threadFactory(move(tf));
tm->start(); tm->start();
ts->setAddress({"::1", 0}); ts->setAddress({"::1", 0});
Expand Down
3 changes: 1 addition & 2 deletions thrift/lib/cpp2/test/util/TestThriftServerFactory.h
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ struct TestThriftServerFactory : public TestServerFactory {
auto threadFactory = auto threadFactory =
std::make_shared<apache::thrift::concurrency::PosixThreadFactory>(); std::make_shared<apache::thrift::concurrency::PosixThreadFactory>();
auto threadManager = auto threadManager =
apache::thrift::concurrency::ThreadManager::newSimpleThreadManager( apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(1);
1, false);
threadManager->threadFactory(threadFactory); threadManager->threadFactory(threadFactory);
threadManager->start(); threadManager->start();
server->setThreadManager(threadManager); server->setThreadManager(threadManager);
Expand Down
2 changes: 1 addition & 1 deletion thrift/lib/cpp2/util/ScopedServerInterfaceThread.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ScopedServerInterfaceThread::ScopedServerInterfaceThread(
SocketAddress const& addr, SocketAddress const& addr,
ServerConfigCb configCb) { ServerConfigCb configCb) {
auto tf = make_shared<PosixThreadFactory>(PosixThreadFactory::ATTACHED); auto tf = make_shared<PosixThreadFactory>(PosixThreadFactory::ATTACHED);
auto tm = ThreadManager::newSimpleThreadManager(1, false); auto tm = ThreadManager::newSimpleThreadManager(1);
tm->threadFactory(move(tf)); tm->threadFactory(move(tf));
tm->start(); tm->start();
auto ts = make_shared<ThriftServer>(); auto ts = make_shared<ThriftServer>();
Expand Down
21 changes: 7 additions & 14 deletions thrift/lib/py/server/CppServerWrapper.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -605,8 +605,8 @@ class CppServerWrapper : public ThriftServer {
std::make_shared<CppServerEventHandler>(serverEventHandler)); std::make_shared<CppServerEventHandler>(serverEventHandler));
} }


void setNewSimpleThreadManager(size_t count, size_t, bool enableTaskStats) { void setNewSimpleThreadManager(size_t count, size_t) {
auto tm = ThreadManager::newSimpleThreadManager(count, enableTaskStats); auto tm = ThreadManager::newSimpleThreadManager(count);
auto poolThreadName = getCPUWorkerThreadName(); auto poolThreadName = getCPUWorkerThreadName();
if (!poolThreadName.empty()) { if (!poolThreadName.empty()) {
tm->setNamePrefix(poolThreadName); tm->setNamePrefix(poolThreadName);
Expand All @@ -617,10 +617,8 @@ class CppServerWrapper : public ThriftServer {
setThreadManager(std::move(tm)); setThreadManager(std::move(tm));
} }


void setNewPriorityQueueThreadManager( void setNewPriorityQueueThreadManager(size_t numThreads) {
size_t numThreads, bool enableTaskStats) { auto tm = ThreadManager::newPriorityQueueThreadManager(numThreads);
auto tm = ThreadManager::newPriorityQueueThreadManager(
numThreads, enableTaskStats);
auto poolThreadName = getCPUWorkerThreadName(); auto poolThreadName = getCPUWorkerThreadName();
if (!poolThreadName.empty()) { if (!poolThreadName.empty()) {
tm->setNamePrefix(poolThreadName); tm->setNamePrefix(poolThreadName);
Expand All @@ -637,11 +635,9 @@ class CppServerWrapper : public ThriftServer {
size_t important, size_t important,
size_t normal, size_t normal,
size_t best_effort, size_t best_effort,
bool enableTaskStats,
size_t) { size_t) {
auto tm = PriorityThreadManager::newPriorityThreadManager( auto tm = PriorityThreadManager::newPriorityThreadManager(
{{high_important, high, important, normal, best_effort}}, {{high_important, high, important, normal, best_effort}});
enableTaskStats);
tm->enableCodel(getEnableCodel()); tm->enableCodel(getEnableCodel());
auto poolThreadName = getCPUWorkerThreadName(); auto poolThreadName = getCPUWorkerThreadName();
if (!poolThreadName.empty()) { if (!poolThreadName.empty()) {
Expand Down Expand Up @@ -743,13 +739,11 @@ BOOST_PYTHON_MODULE(CppServerWrapper) {
.def( .def(
"setNewSimpleThreadManager", "setNewSimpleThreadManager",
&CppServerWrapper::setNewSimpleThreadManager, &CppServerWrapper::setNewSimpleThreadManager,
(arg("count"), (arg("count"), arg("pendingTaskCountMax")))
arg("pendingTaskCountMax"),
arg("enableTaskStats") = false))
.def( .def(
"setNewPriorityQueueThreadManager", "setNewPriorityQueueThreadManager",
&CppServerWrapper::setNewPriorityQueueThreadManager, &CppServerWrapper::setNewPriorityQueueThreadManager,
(arg("numThreads"), arg("enableTaskStats") = false)) (arg("numThreads")))
.def( .def(
"setNewPriorityThreadManager", "setNewPriorityThreadManager",
&CppServerWrapper::setNewPriorityThreadManager, &CppServerWrapper::setNewPriorityThreadManager,
Expand All @@ -758,7 +752,6 @@ BOOST_PYTHON_MODULE(CppServerWrapper) {
arg("important"), arg("important"),
arg("normal"), arg("normal"),
arg("best_effort"), arg("best_effort"),
arg("enableTaskStats") = false,
arg("maxQueueLen") = 0)) arg("maxQueueLen") = 0))
.def("setCppSSLConfig", &CppServerWrapper::setCppSSLConfig) .def("setCppSSLConfig", &CppServerWrapper::setCppSSLConfig)
.def("setCppSSLCacheOptions", &CppServerWrapper::setCppSSLCacheOptions) .def("setCppSSLCacheOptions", &CppServerWrapper::setCppSSLCacheOptions)
Expand Down
Loading

0 comments on commit bfb527c

Please sign in to comment.