-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
simplify graph signal handler (#3542)
* simplify graph signal handler * style * style * fix some comment Co-authored-by: Yee <[email protected]> Co-authored-by: Sophie <[email protected]>
- Loading branch information
1 parent
fa6d838
commit 321f549
Showing
4 changed files
with
162 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/* Copyright (c) 2021 vesoft inc. All rights reserved. | ||
* | ||
* This source code is licensed under Apache 2.0 License. | ||
*/ | ||
#include "GraphServer.h" | ||
|
||
#include <memory> | ||
#include <utility> | ||
|
||
#include "graph/service/GraphFlags.h" | ||
#include "graph/service/GraphService.h" | ||
namespace nebula { | ||
namespace graph { | ||
|
||
GraphServer::GraphServer(HostAddr localHost) : localHost_(std::move(localHost)) {} | ||
|
||
GraphServer::~GraphServer() { | ||
stop(); | ||
} | ||
|
||
bool GraphServer::start() { | ||
auto threadFactory = std::make_shared<folly::NamedThreadFactory>("graph-netio"); | ||
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_netio_threads, | ||
std::move(threadFactory)); | ||
int numThreads = FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads | ||
: thriftServer_->getNumIOWorkerThreads(); | ||
std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager( | ||
PriorityThreadManager::newPriorityThreadManager(numThreads)); | ||
threadManager->setNamePrefix("executor"); | ||
threadManager->start(); | ||
|
||
thriftServer_ = std::make_unique<apache::thrift::ThriftServer>(); | ||
thriftServer_->setIOThreadPool(ioThreadPool); | ||
|
||
auto interface = std::make_shared<GraphService>(); | ||
auto status = interface->init(ioThreadPool, localHost_); | ||
if (!status.ok()) { | ||
LOG(ERROR) << status; | ||
return false; | ||
} | ||
|
||
graphThread_ = std::make_unique<std::thread>([&] { | ||
thriftServer_->setPort(localHost_.port); | ||
thriftServer_->setInterface(std::move(interface)); | ||
thriftServer_->setReusePort(FLAGS_reuse_port); | ||
thriftServer_->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs)); | ||
thriftServer_->setNumAcceptThreads(FLAGS_num_accept_threads); | ||
thriftServer_->setListenBacklog(FLAGS_listen_backlog); | ||
if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl) { | ||
thriftServer_->setSSLConfig(nebula::sslContextConfig()); | ||
} | ||
thriftServer_->setThreadManager(threadManager); | ||
|
||
serverStatus_.store(STATUS_RUNNING); | ||
FLOG_INFO("Starting nebula-graphd on %s:%d\n", localHost_.host.c_str(), localHost_.port); | ||
try { | ||
thriftServer_->serve(); // Blocking wait until shut down via thriftServer_->stop() | ||
} catch (const std::exception &e) { | ||
FLOG_ERROR("Exception thrown while starting the graph RPC server: %s", e.what()); | ||
} | ||
serverStatus_.store(STATUS_STOPPED); | ||
FLOG_INFO("nebula-graphd on %s:%d has been stopped", localHost_.host.c_str(), localHost_.port); | ||
}); | ||
|
||
while (serverStatus_ == STATUS_UNINITIALIZED) { | ||
std::this_thread::sleep_for(std::chrono::microseconds(100)); | ||
} | ||
return true; | ||
} | ||
|
||
void GraphServer::waitUntilStop() { | ||
{ | ||
std::unique_lock<std::mutex> lkStop(muStop_); | ||
cvStop_.wait(lkStop, [&] { return serverStatus_ != STATUS_RUNNING; }); | ||
} | ||
|
||
thriftServer_->stop(); | ||
|
||
graphThread_->join(); | ||
} | ||
|
||
void GraphServer::notifyStop() { | ||
std::unique_lock<std::mutex> lkStop(muStop_); | ||
if (serverStatus_ == STATUS_RUNNING) { | ||
serverStatus_ = STATUS_STOPPED; | ||
cvStop_.notify_one(); | ||
} | ||
} | ||
|
||
void GraphServer::stop() { | ||
if (serverStatus_.load() == ServiceStatus::STATUS_STOPPED) { | ||
LOG(INFO) << "The graph server has been stopped"; | ||
return; | ||
} | ||
|
||
ServiceStatus serverExpected = ServiceStatus::STATUS_RUNNING; | ||
serverStatus_.compare_exchange_strong(serverExpected, STATUS_STOPPED); | ||
|
||
if (thriftServer_) { | ||
thriftServer_->stop(); | ||
} | ||
} | ||
|
||
} // namespace graph | ||
} // namespace nebula |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* Copyright (c) 2021 vesoft inc. All rights reserved. | ||
* | ||
* This source code is licensed under Apache 2.0 License. | ||
*/ | ||
#include <thrift/lib/cpp2/server/ThriftServer.h> | ||
|
||
#include <cstdint> | ||
#include <mutex> | ||
#include <thread> | ||
|
||
#include "common/base/Base.h" | ||
#include "common/base/SignalHandler.h" | ||
#include "common/network/NetworkUtils.h" | ||
namespace nebula { | ||
namespace graph { | ||
class GraphServer { | ||
public: | ||
explicit GraphServer(HostAddr localHost); | ||
|
||
~GraphServer(); | ||
|
||
// Return false if failed. | ||
bool start(); | ||
|
||
void stop(); | ||
|
||
// used for signal handler to set an internal stop flag | ||
void notifyStop(); | ||
|
||
void waitUntilStop(); | ||
|
||
private: | ||
HostAddr localHost_; | ||
|
||
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_; | ||
std::shared_ptr<apache::thrift::concurrency::ThreadManager> workers_; | ||
std::unique_ptr<apache::thrift::ThriftServer> thriftServer_; | ||
std::unique_ptr<std::thread> graphThread_; | ||
|
||
enum ServiceStatus : uint8_t { STATUS_UNINITIALIZED = 0, STATUS_RUNNING = 1, STATUS_STOPPED = 2 }; | ||
std::atomic<ServiceStatus> serverStatus_{STATUS_UNINITIALIZED}; | ||
std::mutex muStop_; | ||
std::condition_variable cvStop_; | ||
}; | ||
} // namespace graph | ||
} // namespace nebula |