Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/vesoft-inc/nebula into en…
Browse files Browse the repository at this point in the history
…hancement/error-msg-more-friendly
  • Loading branch information
Shylock-Hg committed Dec 27, 2021
2 parents 5d92224 + fd8d8a8 commit 16fdcb2
Show file tree
Hide file tree
Showing 34 changed files with 834 additions and 113 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,4 @@ venv/

#ctags
.tags
/.vs
3 changes: 3 additions & 0 deletions .linters/cpp/checkKeyword.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@
'KW_ZONE',
'KW_ZONES',
'KW_RENAME',
'KW_IGNORE_EXISTED_INDEX',
'KW_GEOGRAPHY',
'KW_DURATION',
]


Expand Down
10 changes: 10 additions & 0 deletions src/common/expression/LogicalExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ class LogicalExpression final : public Expression {
return true;
}

void reverseLogicalKind() {
if (kind_ == Kind::kLogicalAnd) {
kind_ = Kind::kLogicalOr;
} else if (kind_ == Kind::kLogicalOr) {
kind_ = Kind::kLogicalAnd;
} else {
LOG(FATAL) << "Should not reverse logical expression except and/or kind.";
}
}

private:
explicit LogicalExpression(ObjectPool* pool, Kind kind) : Expression(pool, kind) {}

Expand Down
17 changes: 10 additions & 7 deletions src/common/memory/MemoryUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <folly/String.h>
#include <gflags/gflags.h>

#include <algorithm>
#include <cstdio>
#include <fstream>
#include <regex>
Expand All @@ -21,7 +22,9 @@ using nebula::fs::FileUtils;

namespace nebula {

static const std::regex reMemAvailable(R"(^Mem(Available|Total):\s+(\d+)\skB$)");
static const std::regex reMemAvailable(
R"(^Mem(Available|Free|Total):\s+(\d+)\skB$)"); // when can't use MemAvailable, use MemFree
// instead.
static const std::regex reTotalCache(R"(^total_(cache|inactive_file)\s+(\d+)$)");

std::atomic_bool MemoryUtils::kHitMemoryHighWatermark{false};
Expand Down Expand Up @@ -63,13 +66,13 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
auto& sm = iter.matched();
memorySize.emplace_back(std::stoul(sm[2].str(), NULL) << 10);
}
CHECK_EQ(memorySize.size(), 2U);
size_t i = 0, j = 1;
if (memorySize[0] < memorySize[1]) {
std::swap(i, j);
std::sort(memorySize.begin(), memorySize.end());
if (memorySize.size() >= 2u) {
total = memorySize.back();
available = memorySize[memorySize.size() - 2];
} else {
return false;
}
total = memorySize[i];
available = memorySize[j];
}

auto hits = (1 - available / total) > FLAGS_system_memory_high_watermark_ratio;
Expand Down
3 changes: 0 additions & 3 deletions src/graph/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ Value GetNeighborsIter::getVertex(const std::string& name) const {

List GetNeighborsIter::getVertices() {
List vertices;
vertices.values.reserve(size());
valid_ = true;
colIdx_ = -2;
for (currentDs_ = dsIndices_.begin(); currentDs_ < dsIndices_.end(); ++currentDs_) {
Expand Down Expand Up @@ -513,7 +512,6 @@ Value GetNeighborsIter::getEdge() const {

List GetNeighborsIter::getEdges() {
List edges;
edges.values.reserve(size());
for (; valid(); next()) {
auto edge = getEdge();
if (edge.isEdge()) {
Expand Down Expand Up @@ -862,7 +860,6 @@ Value PropIter::getEdge() const {
List PropIter::getVertices() {
DCHECK(iter_ == rows_->begin());
List vertices;
vertices.values.reserve(size());
for (; valid(); next()) {
vertices.values.emplace_back(getVertex());
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/context/Iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class GetNeighborsIter final : public Iterator {
void sample(int64_t count) override;

size_t size() const override {
return 0;
LOG(FATAL) << "Unimplemented method for Get Neighbros iterator.";
}

const Value& getColumn(const std::string& col) const override;
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ void Executor::drop() {
Status Executor::finish(Result &&result) {
if (!FLAGS_enable_lifetime_optimize ||
node()->outputVarPtr()->userCount.load(std::memory_order_relaxed) != 0) {
numRows_ = result.size();
numRows_ = !result.iterRef()->isGetNeighborsIter() ? result.size() : 0;
result.checkMemory(node()->isQueryNode());
ectx_->setResult(node()->outputVar(), std::move(result));
} else {
Expand Down
5 changes: 3 additions & 2 deletions src/graph/executor/StorageAccessExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ DataSet buildRequestDataSet(const SpaceInfo &space,
bool dedup) {
DCHECK(iter && expr) << "iter=" << iter << ", expr=" << expr;
nebula::DataSet vertices({kVid});
vertices.rows.reserve(iter->size());
auto s = !iter->isGetNeighborsIter() ? iter->size() : 0;
vertices.rows.reserve(s);

std::unordered_set<VidType> uniqueSet;
uniqueSet.reserve(iter->size());
uniqueSet.reserve(s);

const auto &vidType = *(space.spaceDesc.vid_type_ref());

Expand Down
65 changes: 43 additions & 22 deletions src/graph/executor/admin/SessionExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ folly::Future<Status> ShowSessionsExecutor::execute() {
auto *showNode = asNode<ShowSessions>(node());
if (showNode->isSetSessionID()) {
return getSession(showNode->getSessionId());
} else {
return listSessions();
}
return showNode->isLocalCommand() ? listLocalSessions() : listSessions();
}

folly::Future<Status> ShowSessionsExecutor::listSessions() {
Expand All @@ -31,6 +30,7 @@ folly::Future<Status> ShowSessionsExecutor::listSessions() {
return Status::Error("Show sessions failed: %s.", resp.status().toString().c_str());
}
auto sessions = resp.value().get_sessions();
// Construct result column names
DataSet result({"SessionId",
"UserName",
"SpaceName",
Expand All @@ -40,21 +40,28 @@ folly::Future<Status> ShowSessionsExecutor::listSessions() {
"Timezone",
"ClientIp"});
for (auto &session : sessions) {
Row row;
row.emplace_back(session.get_session_id());
row.emplace_back(session.get_user_name());
row.emplace_back(session.get_space_name());
row.emplace_back(microSecToDateTime(session.get_create_time()));
row.emplace_back(microSecToDateTime(session.get_update_time()));
row.emplace_back(network::NetworkUtils::toHostsStr({session.get_graph_addr()}));
row.emplace_back(session.get_timezone());
row.emplace_back(session.get_client_ip());
result.emplace_back(std::move(row));
addSessions(session, result);
}
return finish(ResultBuilder().value(Value(std::move(result))).build());
});
}

folly::Future<Status> ShowSessionsExecutor::listLocalSessions() {
auto localSessions = qctx_->rctx()->sessionMgr()->getSessionFromLocalCache();
DataSet result({"SessionId",
"UserName",
"SpaceName",
"CreateTime",
"UpdateTime",
"GraphAddr",
"Timezone",
"ClientIp"});
for (auto &session : localSessions) {
addSessions(session, result);
}
return finish(ResultBuilder().value(Value(std::move(result))).build());
}

folly::Future<Status> ShowSessionsExecutor::getSession(SessionID sessionId) {
return qctx()->getMetaClient()->getSession(sessionId).via(runner()).thenValue(
[this, sessionId](StatusOr<meta::cpp2::GetSessionResp> resp) {
Expand All @@ -64,20 +71,34 @@ folly::Future<Status> ShowSessionsExecutor::getSession(SessionID sessionId) {
"Get session `%ld' failed: %s.", sessionId, resp.status().toString().c_str());
}
auto session = resp.value().get_session();
DataSet result({"VariableName", "Value"});
result.emplace_back(Row({"SessionID", session.get_session_id()}));
result.emplace_back(Row({"UserName", session.get_user_name()}));
result.emplace_back(Row({"SpaceName", session.get_space_name()}));
result.emplace_back(Row({"CreateTime", microSecToDateTime(session.get_create_time())}));
result.emplace_back(Row({"UpdateTime", microSecToDateTime(session.get_update_time())}));
result.emplace_back(
Row({"GraphAddr", network::NetworkUtils::toHostsStr({session.get_graph_addr()})}));
result.emplace_back(Row({"Timezone", session.get_timezone()}));
result.emplace_back(Row({"ClientIp", session.get_client_ip()}));

// Construct result column names
DataSet result({"SessionId",
"UserName",
"SpaceName",
"CreateTime",
"UpdateTime",
"GraphAddr",
"Timezone",
"ClientIp"});
addSessions(session, result);
return finish(ResultBuilder().value(Value(std::move(result))).build());
});
}

void ShowSessionsExecutor::addSessions(const meta::cpp2::Session &session, DataSet &dataSet) const {
Row row;
row.emplace_back(session.get_session_id());
row.emplace_back(session.get_user_name());
row.emplace_back(session.get_space_name());
row.emplace_back(microSecToDateTime(session.get_create_time()));
row.emplace_back(microSecToDateTime(session.get_update_time()));
row.emplace_back(network::NetworkUtils::toHostsStr({session.get_graph_addr()}));
row.emplace_back(session.get_timezone());
row.emplace_back(session.get_client_ip());
dataSet.emplace_back(std::move(row));
}

folly::Future<Status> UpdateSessionExecutor::execute() {
VLOG(1) << "Update sessions to metad";
SCOPED_TIMER(&execTime_);
Expand Down
7 changes: 6 additions & 1 deletion src/graph/executor/admin/SessionExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ class ShowSessionsExecutor final : public Executor {
folly::Future<Status> execute() override;

private:
// List sessions in the cluster
folly::Future<Status> listSessions();
// List sessions in the current graph node
folly::Future<Status> listLocalSessions();

folly::Future<Status> getSession(SessionID sessionId);
// Add session info into dataset
void addSessions(const meta::cpp2::Session &session, DataSet &dataSet) const;

DateTime microSecToDateTime(int64_t microSec) {
DateTime microSecToDateTime(const int64_t microSec) const {
auto dateTime = time::TimeConversion::unixSecondsToDateTime(microSec / 1000000);
dateTime.microsec = microSec % 1000000;
return dateTime;
Expand Down
1 change: 0 additions & 1 deletion src/graph/executor/algo/ProduceAllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ folly::Future<Status> ProduceAllPathsExecutor::execute() {
if (!iter->isGetNeighborsIter()) {
return Status::Error("Only accept GetNeighborsIter.");
}
VLOG(1) << "Edge size: " << iter->size();
for (; iter->valid(); iter->next()) {
auto edgeVal = iter->getEdge();
if (!edgeVal.isEdge()) {
Expand Down
1 change: 0 additions & 1 deletion src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ folly::Future<Status> SubgraphExecutor::execute() {
VLOG(1) << "input: " << subgraph->inputVar() << " output: " << node()->outputVar();
auto iter = ectx_->getResult(subgraph->inputVar()).iter();
DCHECK(iter && iter->isGetNeighborsIter());
ds.rows.reserve(iter->size());
if (currentStep == 1) {
for (; iter->valid(); iter->next()) {
const auto& src = iter->getColumn(nebula::kVid);
Expand Down
3 changes: 1 addition & 2 deletions src/graph/executor/query/FilterExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ folly::Future<Status> FilterExecutor::execute() {
}

VLOG(2) << "Get input var: " << filter->inputVar()
<< ", iterator type: " << static_cast<int16_t>(iter->kind())
<< ", input data size: " << iter->size();
<< ", iterator type: " << static_cast<int16_t>(iter->kind());

ResultBuilder builder;
builder.value(result.valuePtr());
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/ProjectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ folly::Future<Status> ProjectExecutor::execute() {
VLOG(1) << "input: " << project->inputVar();
DataSet ds;
ds.colNames = project->colNames();
ds.rows.reserve(iter->size());
ds.rows.reserve(!iter->isGetNeighborsIter() ? iter->size() : 0);
for (; iter->valid(); iter->next()) {
Row row;
for (auto& col : columns) {
Expand Down
15 changes: 15 additions & 0 deletions src/graph/optimizer/rule/GetEdgesTransformRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ StatusOr<OptRule::TransformResult> GetEdgesTransformRule::transform(
OptGroupNode::create(ctx, newAppendVertices, appendVerticesGroupNode->group());

auto *newScanEdges = traverseToScanEdges(traverse);
if (newScanEdges == nullptr) {
return TransformResult::noTransform();
}
auto newScanEdgesGroup = OptGroup::create(ctx);
auto newScanEdgesGroupNode = newScanEdgesGroup->makeGroupNode(newScanEdges);

Expand Down Expand Up @@ -102,6 +105,18 @@ std::string GetEdgesTransformRule::toString() const {
/*static*/ graph::ScanEdges *GetEdgesTransformRule::traverseToScanEdges(
const graph::Traverse *traverse) {
const auto *edgeProps = traverse->edgeProps();
if (edgeProps == nullptr) {
return nullptr;
}
for (std::size_t i = 0; i < edgeProps->size(); i++) {
auto type = (*edgeProps)[i].get_type();
for (std::size_t j = i + 1; j < edgeProps->size(); j++) {
if (type == -((*edgeProps)[j].get_type())) {
// Don't support to retrieve edges of the inbound/outbound together
return nullptr;
}
}
}
auto scanEdges = ScanEdges::make(
traverse->qctx(),
nullptr,
Expand Down
19 changes: 13 additions & 6 deletions src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -1165,14 +1165,18 @@ class ShowSessions final : public SingleInputNode {
static ShowSessions* make(QueryContext* qctx,
PlanNode* input,
bool isSetSessionID,
SessionID sessionId) {
return qctx->objPool()->add(new ShowSessions(qctx, input, isSetSessionID, sessionId));
SessionID sessionId,
bool isLocalCommand) {
return qctx->objPool()->add(
new ShowSessions(qctx, input, isSetSessionID, sessionId, isLocalCommand));
}

bool isSetSessionID() const {
return isSetSessionID_;
}

bool isLocalCommand() const {
return isLocalCommand_;
}
SessionID getSessionId() const {
return sessionId_;
}
Expand All @@ -1181,15 +1185,18 @@ class ShowSessions final : public SingleInputNode {
explicit ShowSessions(QueryContext* qctx,
PlanNode* input,
bool isSetSessionID,
SessionID sessionId)
SessionID sessionId,
bool isLocalCommand)
: SingleInputNode(qctx, Kind::kShowSessions, input) {
isSetSessionID_ = isSetSessionID;
sessionId_ = sessionId;
isSetSessionID_ = isSetSessionID;
isLocalCommand_ = isLocalCommand;
}

private:
bool isSetSessionID_{false};
SessionID sessionId_{-1};
bool isSetSessionID_{false};
bool isLocalCommand_{false};
};

class UpdateSession final : public SingleInputNode {
Expand Down
9 changes: 9 additions & 0 deletions src/graph/session/GraphSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ folly::Future<StatusOr<std::shared_ptr<ClientSession>>> GraphSessionManager::fin
return metaClient_->getSession(id).via(runner).thenValue(addSession);
}

std::vector<meta::cpp2::Session> GraphSessionManager::getSessionFromLocalCache() const {
std::vector<meta::cpp2::Session> sessions;
sessions.reserve(activeSessions_.size());
for (auto& it : activeSessions_) {
sessions.emplace_back(it.second->getSession());
}
return sessions;
}

folly::Future<StatusOr<std::shared_ptr<ClientSession>>> GraphSessionManager::createSession(
const std::string userName, const std::string clientIp, folly::Executor* runner) {
auto createCB = [this,
Expand Down
4 changes: 4 additions & 0 deletions src/graph/session/GraphSessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "common/thrift/ThriftTypes.h"
#include "graph/session/ClientSession.h"
#include "interface/gen-cpp2/GraphService.h"
#include "interface/gen-cpp2/meta_types.h"

/**
* GraphSessionManager manages the client sessions, e.g. create new, find
Expand Down Expand Up @@ -60,6 +61,9 @@ class GraphSessionManager final : public SessionManager<ClientSession> {
*/
std::shared_ptr<ClientSession> findSessionFromCache(SessionID id);

// get all seesions in the local cache
std::vector<meta::cpp2::Session> getSessionFromLocalCache() const;

private:
folly::Future<StatusOr<std::shared_ptr<ClientSession>>> findSessionFromMetad(
SessionID id, folly::Executor* runner);
Expand Down
Loading

0 comments on commit 16fdcb2

Please sign in to comment.