-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathQueryEngine.cpp
83 lines (68 loc) · 2.96 KB
/
QueryEngine.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#include "graph/service/QueryEngine.h"
#include "common/base/Base.h"
#include "common/memory/MemoryUtils.h"
#include "common/meta/ServerBasedIndexManager.h"
#include "common/meta/ServerBasedSchemaManager.h"
#include "graph/context/QueryContext.h"
#include "graph/optimizer/OptRule.h"
#include "graph/planner/PlannersRegister.h"
#include "graph/service/GraphFlags.h"
#include "graph/service/QueryInstance.h"
#include "version/Version.h"
DECLARE_bool(local_config);
DECLARE_bool(enable_optimizer);
DECLARE_string(meta_server_addrs);
DEFINE_int32(check_memory_interval_in_secs, 1, "Memory check interval in seconds");
namespace nebula {
namespace graph {
Status QueryEngine::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor,
meta::MetaClient* metaClient) {
metaClient_ = metaClient;
schemaManager_ = meta::ServerBasedSchemaManager::create(metaClient_);
indexManager_ = meta::ServerBasedIndexManager::create(metaClient_);
storage_ = std::make_unique<storage::StorageClient>(ioExecutor, metaClient_);
charsetInfo_ = CharsetInfo::instance();
PlannersRegister::registerPlanners();
// Set default optimizer rules
std::vector<const opt::RuleSet*> rulesets{&opt::RuleSet::DefaultRules()};
if (FLAGS_enable_optimizer) {
rulesets.emplace_back(&opt::RuleSet::QueryRules0());
rulesets.emplace_back(&opt::RuleSet::QueryRules());
}
optimizer_ = std::make_unique<opt::Optimizer>(rulesets);
return setupMemoryMonitorThread();
}
// Create query context and query instance and execute it
void QueryEngine::execute(RequestContextPtr rctx) {
auto qctx = std::make_unique<QueryContext>(std::move(rctx),
schemaManager_.get(),
indexManager_.get(),
storage_.get(),
metaClient_,
charsetInfo_);
auto* instance = new QueryInstance(std::move(qctx), optimizer_.get());
instance->execute();
}
Status QueryEngine::setupMemoryMonitorThread() {
memoryMonitorThread_ = std::make_unique<thread::GenericWorker>();
if (!memoryMonitorThread_ || !memoryMonitorThread_->start("graph-memory-monitor")) {
return Status::Error("Fail to start query engine background thread.");
}
auto updateMemoryWatermark = []() -> Status {
auto status = memory::MemoryUtils::hitsHighWatermark();
NG_RETURN_IF_ERROR(status);
memory::MemoryUtils::kHitMemoryHighWatermark.store(std::move(status).value());
return Status::OK();
};
// Just to test whether to get the right memory info
NG_RETURN_IF_ERROR(updateMemoryWatermark());
auto ms = FLAGS_check_memory_interval_in_secs * 1000;
memoryMonitorThread_->addRepeatTask(ms, updateMemoryWatermark);
return Status::OK();
}
} // namespace graph
} // namespace nebula