Skip to content

Commit

Permalink
Merge branch 'master' into fix-drop-host
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet authored Sep 23, 2021
2 parents 807b5db + 8fbb935 commit f0ac932
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 81 deletions.
15 changes: 11 additions & 4 deletions src/graph/executor/admin/ListRolesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,17 @@ folly::Future<Status> ListRolesExecutor::listRoles() {
auto foundItem = std::find_if(items.begin(), items.end(), [&account](const auto &item) {
return item.get_user_id() == account;
});
if (foundItem != items.end() && foundItem->get_role_type() != meta::cpp2::RoleType::ADMIN) {
v.emplace_back(Row({foundItem->get_user_id(),
apache::thrift::util::enumNameSafe(foundItem->get_role_type())}));
} else {
if (foundItem != items.end()) {
if (foundItem->get_role_type() != meta::cpp2::RoleType::ADMIN) {
v.emplace_back(Row({foundItem->get_user_id(),
apache::thrift::util::enumNameSafe(foundItem->get_role_type())}));
} else {
for (const auto &item : items) {
v.emplace_back(nebula::Row(
{item.get_user_id(), apache::thrift::util::enumNameSafe(item.get_role_type())}));
}
}
} else if (qctx_->rctx()->session()->isGod()) {
for (const auto &item : items) {
v.emplace_back(nebula::Row(
{item.get_user_id(), apache::thrift::util::enumNameSafe(item.get_role_type())}));
Expand Down
2 changes: 1 addition & 1 deletion src/graph/session/ClientSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void ClientSession::deleteQuery(QueryContext* qctx) {
session_.queries_ref()->erase(epId);
}

bool ClientSession::findQuery(nebula::ExecutionPlanID epId) {
bool ClientSession::findQuery(nebula::ExecutionPlanID epId) const {
folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_);
auto context = contexts_.find(epId);
if (context != contexts_.end()) {
Expand Down
24 changes: 12 additions & 12 deletions src/graph/session/ClientSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ class ClientSession final {
static std::shared_ptr<ClientSession> create(meta::cpp2::Session&& session,
meta::MetaClient* metaClient);

int64_t id() {
int64_t id() const {
folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_);
return session_.get_session_id();
}

const SpaceInfo space() {
const SpaceInfo& space() const {
folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_);
return space_;
}
Expand All @@ -47,22 +47,22 @@ class ClientSession final {
}
}

const std::string spaceName() {
const std::string& spaceName() const {
folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_);
return session_.get_space_name();
}

const std::string user() {
const std::string& user() const {
folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_);
return session_.get_user_name();
}

std::unordered_map<GraphSpaceID, meta::cpp2::RoleType> roles() {
const std::unordered_map<GraphSpaceID, meta::cpp2::RoleType>& roles() const {
folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_);
return roles_;
}

StatusOr<meta::cpp2::RoleType> roleWithSpace(GraphSpaceID space) {
StatusOr<meta::cpp2::RoleType> roleWithSpace(GraphSpaceID space) const {
folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_);
auto ret = roles_.find(space);
if (ret == roles_.end()) {
Expand All @@ -71,7 +71,7 @@ class ClientSession final {
return ret->second;
}

bool isGod() {
bool isGod() const {
folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_);
// Cloud may have multiple God accounts
for (auto& role : roles_) {
Expand All @@ -91,12 +91,12 @@ class ClientSession final {

void charge();

int32_t getTimezone() {
int32_t getTimezone() const {
folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_);
return session_.get_timezone();
}

HostAddr getGraphAddr() {
const HostAddr& getGraphAddr() const {
folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_);
return session_.get_graph_addr();
}
Expand All @@ -120,7 +120,7 @@ class ClientSession final {
}
}

const meta::cpp2::Session getSession() {
const meta::cpp2::Session& getSession() const {
folly::RWSpinLock::ReadHolder rHolder(rwSpinLock_);
return session_;
}
Expand All @@ -134,7 +134,7 @@ class ClientSession final {

void deleteQuery(QueryContext* qctx);

bool findQuery(nebula::ExecutionPlanID epId);
bool findQuery(nebula::ExecutionPlanID epId) const;

void markQueryKilled(nebula::ExecutionPlanID epId);

Expand All @@ -150,7 +150,7 @@ class ClientSession final {
time::Duration idleDuration_;
meta::cpp2::Session session_;
meta::MetaClient* metaClient_{nullptr};
folly::RWSpinLock rwSpinLock_;
mutable folly::RWSpinLock rwSpinLock_;
/*
* map<spaceId, role>
* One user can have roles in multiple spaces
Expand Down
10 changes: 5 additions & 5 deletions src/storage/exec/IndexEdgeNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class IndexEdgeNode final : public RelNode<T> {
IndexScanNode<T>* indexScanNode,
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
const std::string& schemaName,
int64_t limit)
int64_t limit = -1)
: context_(context),
indexScanNode_(indexScanNode),
schemas_(schemas),
Expand All @@ -42,11 +42,7 @@ class IndexEdgeNode final : public RelNode<T> {
data_.clear();
std::vector<storage::cpp2::EdgeKey> edges;
auto* iter = static_cast<EdgeIndexIterator*>(indexScanNode_->iterator());
int64_t count = 0;
while (iter && iter->valid()) {
if (limit_ > -1 && count++ == limit_) {
break;
}
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
Expand All @@ -66,6 +62,7 @@ class IndexEdgeNode final : public RelNode<T> {
edges.emplace_back(std::move(edge));
iter->next();
}
int64_t count = 0;
for (const auto& edge : edges) {
auto key = NebulaKeyUtils::edgeKey(context_->vIdLen(),
partId,
Expand All @@ -82,6 +79,9 @@ class IndexEdgeNode final : public RelNode<T> {
} else {
return ret;
}
if (limit_ > 0 && ++count >= limit_) {
break;
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
Expand Down
39 changes: 29 additions & 10 deletions src/storage/exec/IndexFilterNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ class IndexFilterNode final : public RelNode<T> {
// data anymore.
IndexFilterNode(RuntimeContext* context,
IndexScanNode<T>* indexScanNode,
StorageExpressionContext* exprCtx = nullptr,
Expression* exp = nullptr,
bool isEdge = false)
StorageExpressionContext* exprCtx,
Expression* exp,
bool isEdge,
int64_t limit = -1)
: context_(context),
indexScanNode_(indexScanNode),
exprCtx_(exprCtx),
filterExp_(exp),
isEdge_(isEdge) {
isEdge_(isEdge),
limit_(limit) {
evalExprByIndex_ = true;
RelNode<T>::name_ = "IndexFilterNode";
}
Expand All @@ -42,9 +44,14 @@ class IndexFilterNode final : public RelNode<T> {
// need to read data.
IndexFilterNode(RuntimeContext* context,
IndexEdgeNode<T>* indexEdgeNode,
StorageExpressionContext* exprCtx = nullptr,
Expression* exp = nullptr)
: context_(context), indexEdgeNode_(indexEdgeNode), exprCtx_(exprCtx), filterExp_(exp) {
StorageExpressionContext* exprCtx,
Expression* exp,
int64_t limit = -1)
: context_(context),
indexEdgeNode_(indexEdgeNode),
exprCtx_(exprCtx),
filterExp_(exp),
limit_(limit) {
evalExprByIndex_ = false;
isEdge_ = true;
}
Expand All @@ -53,9 +60,14 @@ class IndexFilterNode final : public RelNode<T> {
// need to read data.
IndexFilterNode(RuntimeContext* context,
IndexVertexNode<T>* indexVertexNode,
StorageExpressionContext* exprCtx = nullptr,
Expression* exp = nullptr)
: context_(context), indexVertexNode_(indexVertexNode), exprCtx_(exprCtx), filterExp_(exp) {
StorageExpressionContext* exprCtx,
Expression* exp,
int64_t limit = -1)
: context_(context),
indexVertexNode_(indexVertexNode),
exprCtx_(exprCtx),
filterExp_(exp),
limit_(limit) {
evalExprByIndex_ = false;
isEdge_ = false;
}
Expand All @@ -74,13 +86,15 @@ class IndexFilterNode final : public RelNode<T> {
} else {
data = indexVertexNode_->moveData();
}
int64_t count = 0;
for (const auto& k : data) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
if (evalExprByIndex_) {
if (check(k.first)) {
data_.emplace_back(k.first, k.second);
count++;
}
} else {
const auto& schemas =
Expand All @@ -91,8 +105,12 @@ class IndexFilterNode final : public RelNode<T> {
}
if (check(reader.get(), k.first)) {
data_.emplace_back(k.first, k.second);
count++;
}
}
if (limit_ > 0 && count >= limit_) {
break;
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
Expand Down Expand Up @@ -143,6 +161,7 @@ class IndexFilterNode final : public RelNode<T> {
Expression* filterExp_;
bool isEdge_;
bool evalExprByIndex_;
int64_t limit_;
std::vector<kvstore::KV> data_{};
};

Expand Down
8 changes: 4 additions & 4 deletions src/storage/exec/IndexScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class IndexScanNode : public RelNode<T> {
IndexScanNode(RuntimeContext* context,
IndexID indexId,
std::vector<cpp2::IndexColumnHint> columnHints,
int64_t limit)
int64_t limit = -1)
: context_(context), indexId_(indexId), columnHints_(std::move(columnHints)), limit_(limit) {
/**
* columnHints's elements are {scanType = PREFIX|RANGE; beginStr; endStr},
Expand Down Expand Up @@ -74,9 +74,6 @@ class IndexScanNode : public RelNode<T> {
data_.clear();
int64_t count = 0;
while (!!iter_ && iter_->valid()) {
if (limit_ > -1 && count++ == limit_) {
break;
}
if (context_->isPlanKilled()) {
return {};
}
Expand All @@ -89,6 +86,9 @@ class IndexScanNode : public RelNode<T> {
}
}
data_.emplace_back(iter_->key(), "");
if (limit_ > 0 && ++count >= limit_) {
break;
}
iter_->next();
}
return std::move(data_);
Expand Down
11 changes: 6 additions & 5 deletions src/storage/exec/IndexVertexNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class IndexVertexNode final : public RelNode<T> {
IndexScanNode<T>* indexScanNode,
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
const std::string& schemaName,
int64_t limit)
int64_t limit = -1)
: context_(context),
indexScanNode_(indexScanNode),
schemas_(schemas),
Expand All @@ -42,11 +42,8 @@ class IndexVertexNode final : public RelNode<T> {
data_.clear();
std::vector<VertexID> vids;
auto* iter = static_cast<VertexIndexIterator*>(indexScanNode_->iterator());
int64_t count = 0;

while (iter && iter->valid()) {
if (limit_ > -1 && count++ == limit_) {
break;
}
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
Expand All @@ -61,6 +58,7 @@ class IndexVertexNode final : public RelNode<T> {
vids.emplace_back(iter->vId());
iter->next();
}
int64_t count = 0;
for (const auto& vId : vids) {
VLOG(1) << "partId " << partId << ", vId " << vId << ", tagId " << context_->tagId_;
auto key = NebulaKeyUtils::vertexKey(context_->vIdLen(), partId, vId, context_->tagId_);
Expand All @@ -73,6 +71,9 @@ class IndexVertexNode final : public RelNode<T> {
} else {
return ret;
}
if (limit_ > 0 && ++count >= limit_) {
break;
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
Expand Down
28 changes: 15 additions & 13 deletions src/storage/index/LookupBaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ template <typename REQ, typename RESP>
StatusOr<StoragePlan<IndexID>> LookupBaseProcessor<REQ, RESP>::buildPlan(
IndexFilterItem* filterItem, nebula::DataSet* result) {
StoragePlan<IndexID> plan;
// TODO(sky) : Limit is not supported yet for de-dup node.
// Related to paging scan, the de-dup execution plan needs to be refactored
auto deDup = std::make_unique<DeDupNode<IndexID>>(result, deDupColPos_);
int32_t filterId = 0;
std::unique_ptr<IndexOutputNode<IndexID>> out;
Expand Down Expand Up @@ -319,8 +321,8 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();

auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);
auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));
if (context_->isEdge()) {
auto edge = std::make_unique<IndexEdgeNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->edgeName_, limit_);
Expand Down Expand Up @@ -370,11 +372,11 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();

auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);
auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));

auto filter = std::make_unique<IndexFilterNode<IndexID>>(
context_.get(), indexScan.get(), exprCtx, exp, context_->isEdge());
context_.get(), indexScan.get(), exprCtx, exp, context_->isEdge(), limit_);
filter->addDependency(indexScan.get());
auto output =
std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), filter.get(), true);
Expand Down Expand Up @@ -421,14 +423,14 @@ LookupBaseProcessor<REQ, RESP>::buildPlanWithDataAndFilter(nebula::DataSet* resu
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();

auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);
auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));
if (context_->isEdge()) {
auto edge = std::make_unique<IndexEdgeNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->edgeName_, limit_);
context_.get(), indexScan.get(), schemas_, context_->edgeName_);
edge->addDependency(indexScan.get());
auto filter =
std::make_unique<IndexFilterNode<IndexID>>(context_.get(), edge.get(), exprCtx, exp);
auto filter = std::make_unique<IndexFilterNode<IndexID>>(
context_.get(), edge.get(), exprCtx, exp, limit_);
filter->addDependency(edge.get());

auto output = std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), filter.get());
Expand All @@ -439,10 +441,10 @@ LookupBaseProcessor<REQ, RESP>::buildPlanWithDataAndFilter(nebula::DataSet* resu
return output;
} else {
auto vertex = std::make_unique<IndexVertexNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->tagName_, limit_);
context_.get(), indexScan.get(), schemas_, context_->tagName_);
vertex->addDependency(indexScan.get());
auto filter =
std::make_unique<IndexFilterNode<IndexID>>(context_.get(), vertex.get(), exprCtx, exp);
auto filter = std::make_unique<IndexFilterNode<IndexID>>(
context_.get(), vertex.get(), exprCtx, exp, limit_);
filter->addDependency(vertex.get());

auto output = std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), filter.get());
Expand Down
Loading

0 comments on commit f0ac932

Please sign in to comment.