Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the code of meta session manager and remove the session lock #5762

Merged
merged 7 commits into from
Dec 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
326 changes: 176 additions & 150 deletions src/meta/processors/session/SessionManagerProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,8 @@
namespace nebula {
namespace meta {

void CreateSessionProcessor::process(const cpp2::CreateSessionReq& req) {
folly::SharedMutex::WriteHolder holder(LockUtils::sessionLock());
const auto& user = req.get_user();
auto ret = userExist(user);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "User does not exist, errorCode: " << apache::thrift::util::enumNameSafe(ret);
handleErrorCode(ret);
onFinished();
return;
}

void CreateSessionProcessor::process(const cpp2::CreateSessionReq &req) {
auto user = req.get_user();
cpp2::Session session;
// The sessionId is generated by microsecond timestamp
session.session_id_ref() = time::WallClock::fastNowInMicroSec();
Expand All @@ -27,97 +18,120 @@ void CreateSessionProcessor::process(const cpp2::CreateSessionReq& req) {
session.user_name_ref() = user;
session.graph_addr_ref() = req.get_graph_addr();
session.client_ip_ref() = req.get_client_ip();

std::vector<kvstore::KV> data;
data.emplace_back(MetaKeyUtils::sessionKey(session.get_session_id()),
MetaKeyUtils::sessionVal(session));
resp_.session_ref() = session;
ret = doSyncPut(std::move(data));
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Put data error on meta server, errorCode: "
<< apache::thrift::util::enumNameSafe(ret);
}
handleErrorCode(ret);
onFinished();
}

void UpdateSessionsProcessor::process(const cpp2::UpdateSessionsReq& req) {
folly::SharedMutex::WriteHolder holder(LockUtils::sessionLock());
std::vector<kvstore::KV> data;
std::unordered_map<nebula::SessionID,
std::unordered_map<nebula::ExecutionPlanID, cpp2::QueryDesc>>
killedQueries;
// AtomicOp for create
auto getAtomicOp = [this, session = std::move(session), user = std::move(user)]() mutable {
kvstore::MergeableAtomicOpResult atomicOp;
// read userKey
atomicOp.readSet.emplace_back(MetaKeyUtils::userKey(user));
auto ret = userExist(user);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "User does not exist, errorCode: " << apache::thrift::util::enumNameSafe(ret);
atomicOp.code = ret;
return atomicOp;
}
auto batchHolder = std::make_unique<kvstore::BatchHolder>();
auto sessionKey = MetaKeyUtils::sessionKey(session.get_session_id());
// write sessionKey
atomicOp.writeSet.emplace_back(sessionKey);
batchHolder->put(std::move(sessionKey), MetaKeyUtils::sessionVal(session));
atomicOp.batch = encodeBatchValue(batchHolder->getBatch());
atomicOp.code = nebula::cpp2::ErrorCode::SUCCEEDED;
return atomicOp;
};

auto cb = [this](nebula::cpp2::ErrorCode ec) {
if (ec != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Put data error on meta server, errorCode: "
<< apache::thrift::util::enumNameSafe(ec);
}
handleErrorCode(ec);
onFinished();
};

std::vector<SessionID> killedSessions;
kvstore_->asyncAtomicOp(kDefaultSpaceId, kDefaultPartId, std::move(getAtomicOp), std::move(cb));
}

for (auto& session : req.get_sessions()) {
auto sessionId = session.get_session_id();
auto sessionKey = MetaKeyUtils::sessionKey(sessionId);
auto ret = doGet(sessionKey);
if (!nebula::ok(ret)) {
auto errCode = nebula::error(ret);
LOG(INFO) << "Session id '" << sessionId << "' not found";
// If the session requested to be updated can not be found in meta, we consider the session
// has been killed
if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
killedSessions.emplace_back(sessionId);
continue;
} else {
handleErrorCode(errCode);
onFinished();
return;
void UpdateSessionsProcessor::process(const cpp2::UpdateSessionsReq &req) {
// AtomicOp for update
auto getAtomicOp = [this, req]() mutable {
kvstore::MergeableAtomicOpResult atomicOp;
auto batchHolder = std::make_unique<kvstore::BatchHolder>();
std::unordered_map<nebula::SessionID,
std::unordered_map<nebula::ExecutionPlanID, cpp2::QueryDesc>>
killedQueries;
std::vector<SessionID> killedSessions;

for (auto &session : req.get_sessions()) {
auto sessionId = session.get_session_id();
auto sessionKey = MetaKeyUtils::sessionKey(sessionId);
atomicOp.readSet.emplace_back(sessionKey);
auto ret = doGet(sessionKey);
if (!nebula::ok(ret)) {
auto errCode = nebula::error(ret);
// If the session requested to be updated can not be found in meta, we consider the session
// has been killed
if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
killedSessions.emplace_back(sessionId);
continue;
} else {
atomicOp.code = errCode;
return atomicOp;
}
}
}

// update sessions to be saved if query is being killed, and return them to
// client.
auto& newQueries = *session.queries_ref();
std::unordered_map<nebula::ExecutionPlanID, cpp2::QueryDesc> killedQueriesInCurrentSession;
auto sessionInMeta = MetaKeyUtils::parseSessionVal(nebula::value(ret));
for (const auto& savedQuery : sessionInMeta.get_queries()) {
auto epId = savedQuery.first;
auto newQuery = newQueries.find(epId);
if (newQuery == newQueries.end()) {
continue;
// update sessions to be saved if query is being killed, and return them to
// client.
auto &newQueries = *session.queries_ref();
std::unordered_map<nebula::ExecutionPlanID, cpp2::QueryDesc> killedQueriesInCurrentSession;
auto sessionInMeta = MetaKeyUtils::parseSessionVal(nebula::value(ret));
for (const auto &savedQuery : sessionInMeta.get_queries()) {
auto epId = savedQuery.first;
auto newQuery = newQueries.find(epId);
if (newQuery == newQueries.end()) {
continue;
}
auto &desc = savedQuery.second;
if (desc.get_status() == cpp2::QueryStatus::KILLING) {
const_cast<cpp2::QueryDesc &>(newQuery->second).status_ref() = cpp2::QueryStatus::KILLING;
killedQueriesInCurrentSession.emplace(epId, desc);
}
}
auto& desc = savedQuery.second;
if (desc.get_status() == cpp2::QueryStatus::KILLING) {
const_cast<cpp2::QueryDesc&>(newQuery->second).status_ref() = cpp2::QueryStatus::KILLING;
killedQueriesInCurrentSession.emplace(epId, desc);
if (!killedQueriesInCurrentSession.empty()) {
killedQueries[sessionId] = std::move(killedQueriesInCurrentSession);
}
}
if (!killedQueriesInCurrentSession.empty()) {
killedQueries[sessionId] = std::move(killedQueriesInCurrentSession);
}

if (sessionInMeta.get_update_time() > session.get_update_time()) {
VLOG(3) << "The session id: " << session.get_session_id()
<< ", the new update time: " << session.get_update_time()
<< ", the old update time: " << sessionInMeta.get_update_time();
continue;
if (sessionInMeta.get_update_time() > session.get_update_time()) {
continue;
}

atomicOp.writeSet.emplace_back(sessionKey);
batchHolder->put(std::move(sessionKey), MetaKeyUtils::sessionVal(session));
}

data.emplace_back(MetaKeyUtils::sessionKey(sessionId), MetaKeyUtils::sessionVal(session));
}
resp_.killed_queries_ref() = std::move(killedQueries);
resp_.killed_sessions_ref() = std::move(killedSessions);

atomicOp.batch = encodeBatchValue(batchHolder->getBatch());
atomicOp.code = nebula::cpp2::ErrorCode::SUCCEEDED;
return atomicOp;
};

auto ret = doSyncPut(std::move(data));
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Put data error on meta server, errorCode: "
<< apache::thrift::util::enumNameSafe(ret);
handleErrorCode(ret);
auto cb = [this](nebula::cpp2::ErrorCode ec) {
if (ec != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Put data error on meta server, errorCode: "
<< apache::thrift::util::enumNameSafe(ec);
}
handleErrorCode(ec);
onFinished();
return;
}
};

resp_.killed_queries_ref() = std::move(killedQueries);
resp_.killed_sessions_ref() = std::move(killedSessions);
handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED);
onFinished();
kvstore_->asyncAtomicOp(kDefaultSpaceId, kDefaultPartId, std::move(getAtomicOp), std::move(cb));
}

void ListSessionsProcessor::process(const cpp2::ListSessionsReq&) {
folly::SharedMutex::ReadHolder holder(LockUtils::sessionLock());
auto& prefix = MetaKeyUtils::sessionPrefix();
void ListSessionsProcessor::process(const cpp2::ListSessionsReq &) {
auto &prefix = MetaKeyUtils::sessionPrefix();
auto ret = doPrefix(prefix);
if (!nebula::ok(ret)) {
handleErrorCode(nebula::error(ret));
Expand All @@ -139,8 +153,7 @@ void ListSessionsProcessor::process(const cpp2::ListSessionsReq&) {
onFinished();
}

void GetSessionProcessor::process(const cpp2::GetSessionReq& req) {
folly::SharedMutex::ReadHolder holder(LockUtils::sessionLock());
void GetSessionProcessor::process(const cpp2::GetSessionReq &req) {
auto sessionId = req.get_session_id();
auto sessionKey = MetaKeyUtils::sessionKey(sessionId);
auto ret = doGet(sessionKey);
Expand All @@ -161,47 +174,53 @@ void GetSessionProcessor::process(const cpp2::GetSessionReq& req) {
onFinished();
}

void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) {
folly::SharedMutex::WriteHolder holder(LockUtils::sessionLock());
void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq &req) {
std::vector<SessionID> killedSessions;

auto sessionIds = req.get_session_ids();

for (auto sessionId : sessionIds) {
auto sessionKey = MetaKeyUtils::sessionKey(sessionId);
auto ret = doGet(sessionKey);

if (!nebula::ok(ret)) {
auto errCode = nebula::error(ret);
LOG(INFO) << "Session id `" << sessionId << "' not found";

// If the session is not found, we should continue to remove other sessions.
if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
continue;
} else { // for other error like leader change, we handle the error and return.
handleErrorCode(errCode);
onFinished();
return;
// AtomicOp for remove
auto getAtomicOp = [this, sessionId] {
kvstore::MergeableAtomicOpResult atomicOp;
auto batchHolder = std::make_unique<kvstore::BatchHolder>();
auto sessionKey = MetaKeyUtils::sessionKey(sessionId);
atomicOp.readSet.emplace_back(sessionKey);
auto ret = doGet(sessionKey);
if (!nebula::ok(ret)) {
atomicOp.code = nebula::error(ret);
return atomicOp;
}
}
atomicOp.writeSet.emplace_back(sessionKey);
batchHolder->remove(std::move(sessionKey));
atomicOp.batch = encodeBatchValue(batchHolder->getBatch());
atomicOp.code = nebula::cpp2::ErrorCode::SUCCEEDED;
return atomicOp;
};

// Remove session key from kvstore
folly::Baton<true, std::atomic> baton;
nebula::cpp2::ErrorCode errorCode;
kvstore_->asyncRemove(kDefaultSpaceId,
kDefaultPartId,
sessionKey,
[this, &baton, &errorCode](nebula::cpp2::ErrorCode code) {
this->handleErrorCode(code);
errorCode = code;
baton.post();
});
kvstore_->asyncAtomicOp(kDefaultSpaceId,
kDefaultPartId,
std::move(getAtomicOp),
[this, &baton, &errorCode](nebula::cpp2::ErrorCode code) {
this->handleErrorCode(code);
errorCode = code;
baton.post();
});
baton.wait();

// continue if the session is not removed successfully
if (errorCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Remove session key failed, error code: " << static_cast<int32_t>(errorCode);
continue;
if (errorCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
continue;
} else {
// for other error like leader change, we handle the error and return.
handleErrorCode(errorCode);
onFinished();
return;
}
}

// record the removed session id
Expand All @@ -213,47 +232,54 @@ void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) {
onFinished();
}

void KillQueryProcessor::process(const cpp2::KillQueryReq& req) {
folly::SharedMutex::WriteHolder holder(LockUtils::sessionLock());
auto& killQueries = req.get_kill_queries();

std::vector<kvstore::KV> data;
for (auto& kv : killQueries) {
auto sessionId = kv.first;
auto sessionKey = MetaKeyUtils::sessionKey(sessionId);
auto ret = doGet(sessionKey);
if (!nebula::ok(ret)) {
auto errCode = nebula::error(ret);
LOG(INFO) << "Session id `" << sessionId << "' not found";
if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
errCode = nebula::cpp2::ErrorCode::E_SESSION_NOT_FOUND;
void KillQueryProcessor::process(const cpp2::KillQueryReq &req) {
auto &killQueries = req.get_kill_queries();
auto getAtomicOp = [this, killQueries] {
kvstore::MergeableAtomicOpResult atomicOp;
auto batchHolder = std::make_unique<kvstore::BatchHolder>();
for (auto &kv : killQueries) {
auto sessionId = kv.first;
auto sessionKey = MetaKeyUtils::sessionKey(sessionId);
atomicOp.readSet.emplace_back(sessionKey);
auto ret = doGet(sessionKey);
if (!nebula::ok(ret)) {
auto errCode = nebula::error(ret);
if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
errCode = nebula::cpp2::ErrorCode::E_SESSION_NOT_FOUND;
}
atomicOp.code = errCode;
return atomicOp;
}
handleErrorCode(errCode);
onFinished();
return;
}

auto session = MetaKeyUtils::parseSessionVal(nebula::value(ret));
for (auto& epId : kv.second) {
auto query = session.queries_ref()->find(epId);
if (query == session.queries_ref()->end()) {
handleErrorCode(nebula::cpp2::ErrorCode::E_QUERY_NOT_FOUND);
onFinished();
return;
auto session = MetaKeyUtils::parseSessionVal(nebula::value(ret));
for (auto &epId : kv.second) {
auto query = session.queries_ref()->find(epId);
if (query == session.queries_ref()->end()) {
atomicOp.code = nebula::cpp2::ErrorCode::E_QUERY_NOT_FOUND;
return atomicOp;
}
query->second.status_ref() = cpp2::QueryStatus::KILLING;
}
query->second.status_ref() = cpp2::QueryStatus::KILLING;

atomicOp.writeSet.emplace_back(sessionKey);
batchHolder->put(std::move(sessionKey), MetaKeyUtils::sessionVal(session));
}

data.emplace_back(MetaKeyUtils::sessionKey(sessionId), MetaKeyUtils::sessionVal(session));
}
atomicOp.batch = encodeBatchValue(batchHolder->getBatch());
atomicOp.code = nebula::cpp2::ErrorCode::SUCCEEDED;
return atomicOp;
};

auto putRet = doSyncPut(std::move(data));
if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Put data error on meta server, errorCode: "
<< apache::thrift::util::enumNameSafe(putRet);
}
handleErrorCode(putRet);
onFinished();
auto cb = [this](nebula::cpp2::ErrorCode ec) {
if (ec != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Put data error on meta server, errorCode: "
<< apache::thrift::util::enumNameSafe(ec);
}
handleErrorCode(ec);
onFinished();
};

kvstore_->asyncAtomicOp(kDefaultSpaceId, kDefaultPartId, std::move(getAtomicOp), std::move(cb));
}

} // namespace meta
Expand Down