From a5f3f65750c85c737136b80248a7b6404e92e2dd Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Thu, 23 Dec 2021 18:29:33 +0800 Subject: [PATCH] use rcu replace thread local fix storage exit crash format address some comment --- src/clients/meta/MetaClient.cpp | 415 +++++++++++++++-------------- src/clients/meta/MetaClient.h | 16 +- src/storage/test/ChainTestUtils.h | 6 +- src/storage/test/KillQueryTest.cpp | 2 +- 4 files changed, 232 insertions(+), 207 deletions(-) diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index bf5ca8f6d44..b0d1266ca35 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -41,14 +41,15 @@ DEFINE_int32(check_plan_killed_frequency, 8, "check plan killed every 1< indexItemVec); + MetaClient::MetaClient(std::shared_ptr ioThreadPool, std::vector addrs, const MetaClientOptions& options) : ioThreadPool_(ioThreadPool), addrs_(std::move(addrs)), options_(options), - sessionMap_(new SessionMap{}), - killedPlans_(new folly::F14FastSet>{}) { + metadata_(new MetaData()) { CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required"; CHECK(!addrs_.empty()) << "No meta server address is specified or can be solved. Meta server is required"; @@ -64,8 +65,7 @@ MetaClient::MetaClient(std::shared_ptr ioThreadPool MetaClient::~MetaClient() { stop(); - delete sessionMap_.load(); - delete killedPlans_.load(); + delete metadata_.load(); VLOG(3) << "~MetaClient"; } @@ -161,11 +161,8 @@ bool MetaClient::loadUsersAndRoles() { userRolesMap[user.first] = rolesRet.value(); userPasswordMap[user.first] = user.second; } - { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); - userRolesMap_ = std::move(userRolesMap); - userPasswordMap_ = std::move(userPasswordMap); - } + userRolesMap_ = std::move(userRolesMap); + userPasswordMap_ = std::move(userPasswordMap); return true; } @@ -286,7 +283,6 @@ bool MetaClient::loadData() { decltype(localCache_) oldCache; { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); oldCache = std::move(localCache_); localCache_ = std::move(cache); spaceIndexByName_ = std::move(spaceIndexByName); @@ -301,7 +297,37 @@ bool MetaClient::loadData() { } localDataLastUpdateTime_.store(metadLastUpdateTime_.load()); - + auto newMetaData = new MetaData(); + + for (auto& spaceInfo : localCache_) { + GraphSpaceID spaceId = spaceInfo.first; + std::shared_ptr info = spaceInfo.second; + std::shared_ptr infoDeepCopy = std::make_shared(*info); + infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_); + infoDeepCopy->edgeSchemas_ = buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_); + infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_); + infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_); + newMetaData->localCache_[spaceId] = infoDeepCopy; + } + newMetaData->spaceIndexByName_ = spaceIndexByName_; + newMetaData->spaceTagIndexByName_ = spaceTagIndexByName_; + newMetaData->spaceEdgeIndexByName_ = spaceEdgeIndexByName_; + newMetaData->spaceEdgeIndexByType_ = spaceEdgeIndexByType_; + newMetaData->spaceNewestTagVerMap_ = spaceNewestTagVerMap_; + newMetaData->spaceNewestEdgeVerMap_ = spaceNewestEdgeVerMap_; + newMetaData->spaceTagIndexById_ = spaceTagIndexById_; + newMetaData->spaceAllEdgeMap_ = spaceAllEdgeMap_; + + newMetaData->userRolesMap_ = userRolesMap_; + newMetaData->storageHosts_ = storageHosts_; + newMetaData->fulltextIndexMap_ = fulltextIndexMap_; + newMetaData->userPasswordMap_ = userPasswordMap_; + newMetaData->sessionMap_ = std::move(sessionMap_); + newMetaData->killedPlans_ = std::move(killedPlans_); + newMetaData->fulltextClientList_ = std::move(fulltextClientList_); + auto oldMetaData = metadata_.load(); + metadata_.store(newMetaData); + folly::rcu_retire(oldMetaData); diff(oldCache, localCache_); listenerDiff(oldCache, localCache_); loadRemoteListeners(); @@ -455,7 +481,7 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId, return true; } -static Indexes buildIndexes(std::vector indexItemVec) { +Indexes buildIndexes(std::vector indexItemVec) { Indexes indexes; for (auto index : indexItemVec) { auto indexName = index.get_index_name(); @@ -524,10 +550,7 @@ bool MetaClient::loadGlobalServiceClients() { LOG(ERROR) << "List services failed, status:" << ret.status(); return false; } - { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); - serviceClientList_ = std::move(ret).value(); - } + serviceClientList_ = std::move(ret).value(); return true; } @@ -537,53 +560,15 @@ bool MetaClient::loadFulltextIndexes() { LOG(ERROR) << "List fulltext indexes failed, status:" << ftRet.status(); return false; } - { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); - fulltextIndexMap_ = std::move(ftRet).value(); - } + fulltextIndexMap_ = std::move(ftRet).value(); return true; } -const MetaClient::ThreadLocalInfo& MetaClient::getThreadLocalInfo() { - ThreadLocalInfo& threadLocalInfo = folly::SingletonThreadLocal::get(); - - if (threadLocalInfo.localLastUpdateTime_ < localDataLastUpdateTime_) { - threadLocalInfo.localLastUpdateTime_ = localDataLastUpdateTime_; - - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - for (auto& spaceInfo : localCache_) { - GraphSpaceID spaceId = spaceInfo.first; - std::shared_ptr info = spaceInfo.second; - std::shared_ptr infoDeepCopy = std::make_shared(*info); - infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_); - infoDeepCopy->edgeSchemas_ = - buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_); - infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_); - infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_); - threadLocalInfo.localCache_[spaceId] = infoDeepCopy; - } - threadLocalInfo.spaceIndexByName_ = spaceIndexByName_; - threadLocalInfo.spaceTagIndexByName_ = spaceTagIndexByName_; - threadLocalInfo.spaceEdgeIndexByName_ = spaceEdgeIndexByName_; - threadLocalInfo.spaceEdgeIndexByType_ = spaceEdgeIndexByType_; - threadLocalInfo.spaceNewestTagVerMap_ = spaceNewestTagVerMap_; - threadLocalInfo.spaceNewestEdgeVerMap_ = spaceNewestEdgeVerMap_; - threadLocalInfo.spaceTagIndexById_ = spaceTagIndexById_; - threadLocalInfo.spaceAllEdgeMap_ = spaceAllEdgeMap_; - - threadLocalInfo.userRolesMap_ = userRolesMap_; - threadLocalInfo.storageHosts_ = storageHosts_; - threadLocalInfo.fulltextIndexMap_ = fulltextIndexMap_; - threadLocalInfo.userPasswordMap_ = userPasswordMap_; - } - - return threadLocalInfo; -} - Status MetaClient::checkTagIndexed(GraphSpaceID spaceId, IndexID indexID) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto indexIt = it->second->tagIndexes_.find(indexID); if (indexIt != it->second->tagIndexes_.end()) { return Status::OK(); @@ -595,9 +580,10 @@ Status MetaClient::checkTagIndexed(GraphSpaceID spaceId, IndexID indexID) { } Status MetaClient::checkEdgeIndexed(GraphSpaceID space, IndexID indexID) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(space); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(space); + if (it != metadata.localCache_.end()) { auto indexIt = it->second->edgeIndexes_.find(indexID); if (indexIt != it->second->edgeIndexes_.end()) { return Status::OK(); @@ -1277,9 +1263,10 @@ StatusOr MetaClient::getSpaceIdByNameFromCache(const std::string& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceIndexByName_.find(name); - if (it != threadLocalInfo.spaceIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceIndexByName_.find(name); + if (it != metadata.spaceIndexByName_.end()) { return it->second; } return Status::SpaceNotFound(); @@ -1289,9 +1276,10 @@ StatusOr MetaClient::getSpaceNameByIdFromCache(GraphSpaceID spaceId if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -1303,9 +1291,10 @@ StatusOr MetaClient::getTagIDByNameFromCache(const GraphSpaceID& space, if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceTagIndexByName_.find(std::make_pair(space, name)); - if (it == threadLocalInfo.spaceTagIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceTagIndexByName_.find(std::make_pair(space, name)); + if (it == metadata.spaceTagIndexByName_.end()) { return Status::Error("TagName `%s' is nonexistent", name.c_str()); } return it->second; @@ -1316,9 +1305,10 @@ StatusOr MetaClient::getTagNameByIdFromCache(const GraphSpaceID& sp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceTagIndexById_.find(std::make_pair(space, tagId)); - if (it == threadLocalInfo.spaceTagIndexById_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceTagIndexById_.find(std::make_pair(space, tagId)); + if (it == metadata.spaceTagIndexById_.end()) { return Status::Error("TagID `%d' is nonexistent", tagId); } return it->second; @@ -1329,9 +1319,10 @@ StatusOr MetaClient::getEdgeTypeByNameFromCache(const GraphSpaceID& sp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceEdgeIndexByName_.find(std::make_pair(space, name)); - if (it == threadLocalInfo.spaceEdgeIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceEdgeIndexByName_.find(std::make_pair(space, name)); + if (it == metadata.spaceEdgeIndexByName_.end()) { return Status::Error("EdgeName `%s' is nonexistent", name.c_str()); } return it->second; @@ -1342,9 +1333,10 @@ StatusOr MetaClient::getEdgeNameByTypeFromCache(const GraphSpaceID& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceEdgeIndexByType_.find(std::make_pair(space, edgeType)); - if (it == threadLocalInfo.spaceEdgeIndexByType_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceEdgeIndexByType_.find(std::make_pair(space, edgeType)); + if (it == metadata.spaceEdgeIndexByType_.end()) { return Status::Error("EdgeType `%d' is nonexistent", edgeType); } return it->second; @@ -1354,9 +1346,10 @@ StatusOr> MetaClient::getAllEdgeFromCache(const GraphSp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceAllEdgeMap_.find(space); - if (it == threadLocalInfo.spaceAllEdgeMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceAllEdgeMap_.find(space); + if (it == metadata.spaceAllEdgeMap_.end()) { return Status::Error("SpaceId `%d' is nonexistent", space); } return it->second; @@ -1489,14 +1482,16 @@ folly::Future> MetaClient::removeRange(std::string segment, } PartsMap MetaClient::getPartsMapFromCache(const HostAddr& host) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return doGetPartsMap(host, threadLocalInfo.localCache_); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return doGetPartsMap(host, metadata.localCache_); } StatusOr MetaClient::getPartHostsFromCache(GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it == metadata.localCache_.end()) { return Status::Error("Space not found, spaceid: %d", spaceId); } auto& cache = it->second; @@ -1514,9 +1509,10 @@ StatusOr MetaClient::getPartHostsFromCache(GraphSpaceID spaceId, Part Status MetaClient::checkPartExistInCache(const HostAddr& host, GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto partsIt = it->second->partsOnHost_.find(host); if (partsIt != it->second->partsOnHost_.end()) { for (auto& pId : partsIt->second) { @@ -1533,9 +1529,10 @@ Status MetaClient::checkPartExistInCache(const HostAddr& host, } Status MetaClient::checkSpaceExistInCache(const HostAddr& host, GraphSpaceID spaceId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto partsIt = it->second->partsOnHost_.find(host); if (partsIt != it->second->partsOnHost_.end() && !partsIt->second.empty()) { return Status::OK(); @@ -1547,9 +1544,10 @@ Status MetaClient::checkSpaceExistInCache(const HostAddr& host, GraphSpaceID spa } StatusOr MetaClient::partsNum(GraphSpaceID spaceId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it == metadata.localCache_.end()) { return Status::Error("Space not found, spaceid: %d", spaceId); } return it->second->partsAlloc_.size(); @@ -1940,9 +1938,10 @@ StatusOr MetaClient::getSpaceVidLen(const GraphSpaceID& spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -1958,9 +1957,10 @@ StatusOr MetaClient::getSpaceVidType(const GraphSpac if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -1979,9 +1979,10 @@ StatusOr MetaClient::getSpaceDesc(const GraphSpaceID& space) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(space); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(space); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << space << " not found!"; return Status::Error("Space %d not found", space); } @@ -2002,9 +2003,10 @@ StatusOr> MetaClient::getTagSchemaFr if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt != metadata.localCache_.end()) { auto tagIt = spaceIt->second->tagSchemas_.find(tagID); if (tagIt != spaceIt->second->tagSchemas_.end() && !tagIt->second.empty()) { size_t vNum = tagIt->second.size(); @@ -2022,9 +2024,10 @@ StatusOr> MetaClient::getEdgeSchemaF if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt != metadata.localCache_.end()) { auto edgeIt = spaceIt->second->edgeSchemas_.find(edgeType); if (edgeIt != spaceIt->second->edgeSchemas_.end() && !edgeIt->second.empty()) { size_t vNum = edgeIt->second.size(); @@ -2041,9 +2044,10 @@ StatusOr MetaClient::getAllVerTagSchema(GraphSpaceID spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } return iter->second->tagSchemas_; @@ -2053,9 +2057,10 @@ StatusOr MetaClient::getAllLatestVerTagSchema(const GraphSpaceID& spa if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } TagSchema tagsSchema; @@ -2071,9 +2076,10 @@ StatusOr MetaClient::getAllVerEdgeSchema(GraphSpaceID spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } return iter->second->edgeSchemas_; @@ -2083,9 +2089,10 @@ StatusOr MetaClient::getAllLatestVerEdgeSchemaFromCache(const GraphS if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } EdgeSchema edgesSchema; @@ -2173,9 +2180,10 @@ StatusOr> MetaClient::getTagIndexFromCache(Grap return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2210,9 +2218,10 @@ StatusOr> MetaClient::getEdgeIndexFromCache(Gra return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2247,9 +2256,10 @@ StatusOr>> MetaClient::getTagIndexe return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2270,9 +2280,10 @@ StatusOr>> MetaClient::getEdgeIndex return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2346,9 +2357,10 @@ std::vector MetaClient::getRolesByUserFromCache(const std::strin if (!ready_) { return std::vector(0); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.userRolesMap_.find(user); - if (iter == threadLocalInfo.userRolesMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userRolesMap_.find(user); + if (iter == metadata.userRolesMap_.end()) { return std::vector(0); } return iter->second; @@ -2358,9 +2370,10 @@ bool MetaClient::authCheckFromCache(const std::string& account, const std::strin if (!ready_) { return false; } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.userPasswordMap_.find(account); - if (iter == threadLocalInfo.userPasswordMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userPasswordMap_.find(account); + if (iter == metadata.userPasswordMap_.end()) { return false; } return iter->second == password; @@ -2370,18 +2383,20 @@ bool MetaClient::checkShadowAccountFromCache(const std::string& account) { if (!ready_) { return false; } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.userPasswordMap_.find(account); - if (iter != threadLocalInfo.userPasswordMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userPasswordMap_.find(account); + if (iter != metadata.userPasswordMap_.end()) { return true; } return false; } StatusOr MetaClient::getTermFromCache(GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceInfo = threadLocalInfo.localCache_.find(spaceId); - if (spaceInfo == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceInfo = metadata.localCache_.find(spaceId); + if (spaceInfo == metadata.localCache_.end()) { return Status::Error("Term not found!"); } @@ -2398,8 +2413,9 @@ StatusOr> MetaClient::getStorageHosts() { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return threadLocalInfo.storageHosts_; + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return metadata.storageHosts_; } StatusOr MetaClient::getLatestTagVersionFromCache(const GraphSpaceID& space, @@ -2407,9 +2423,10 @@ StatusOr MetaClient::getLatestTagVersionFromCache(const GraphSpaceID& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceNewestTagVerMap_.find(std::make_pair(space, tagId)); - if (it == threadLocalInfo.spaceNewestTagVerMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceNewestTagVerMap_.find(std::make_pair(space, tagId)); + if (it == metadata.spaceNewestTagVerMap_.end()) { return Status::TagNotFound(); } return it->second; @@ -2420,9 +2437,10 @@ StatusOr MetaClient::getLatestEdgeVersionFromCache(const GraphSpaceID if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceNewestEdgeVerMap_.find(std::make_pair(space, edgeType)); - if (it == threadLocalInfo.spaceNewestEdgeVerMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceNewestEdgeVerMap_.find(std::make_pair(space, edgeType)); + if (it == metadata.spaceNewestEdgeVerMap_.end()) { return Status::EdgeNotFound(); } return it->second; @@ -2876,9 +2894,10 @@ MetaClient::getListenersBySpaceHostFromCache(GraphSpaceID spaceId, const HostAdd if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -2895,8 +2914,9 @@ StatusOr MetaClient::getListenersByHostFromCache(const HostAddr& h if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return doGetListenersMap(host, threadLocalInfo.localCache_); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return doGetListenersMap(host, metadata.localCache_); } ListenersMap MetaClient::doGetListenersMap(const HostAddr& host, const LocalCache& localCache) { @@ -2932,9 +2952,10 @@ StatusOr MetaClient::getListenerHostsBySpacePartType(GraphSpaceID spac if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -2953,9 +2974,10 @@ StatusOr> MetaClient::getListenerHostTypeBySpace if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -3044,8 +3066,9 @@ void MetaClient::updateNestedGflags(const std::unordered_map optionMap.emplace(value.first, value.second.toString()); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - for (const auto& spaceEntry : threadLocalInfo.localCache_) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + for (const auto& spaceEntry : metadata.localCache_) { listener_->onSpaceOptionUpdated(spaceEntry.first, optionMap); } } @@ -3332,11 +3355,11 @@ StatusOr> MetaClient::getServiceClientsFromCach if (!ready_) { return Status::Error("Not ready!"); } - - folly::RWSpinLock::ReadHolder holder(localCacheLock_); + folly::rcu_reader guard; + auto& metadata = *metadata_.load(); if (type == cpp2::ExternalServiceType::ELASTICSEARCH) { - auto sIter = serviceClientList_.find(type); - if (sIter != serviceClientList_.end()) { + auto sIter = metadata.serviceClientList_.find(type); + if (sIter != metadata.serviceClientList_.end()) { return sIter->second; } } @@ -3398,8 +3421,9 @@ StatusOr> MetaClient::getFTIndexe if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return threadLocalInfo.fulltextIndexMap_; + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return metadata.fulltextIndexMap_; } StatusOr> MetaClient::getFTIndexBySpaceFromCache( @@ -3407,9 +3431,10 @@ StatusOr> MetaClient::getFTIndexB if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); std::unordered_map indexes; - for (const auto& it : threadLocalInfo.fulltextIndexMap_) { + for (const auto& it : metadata.fulltextIndexMap_) { if (it.second.get_space_id() == spaceId) { indexes[it.first] = it.second; } @@ -3422,8 +3447,9 @@ StatusOr> MetaClient::getFTIndexBySpaceSch if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - for (auto& it : threadLocalInfo.fulltextIndexMap_) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + for (auto& it : metadata.fulltextIndexMap_) { auto id = it.second.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type ? it.second.get_depend_schema().get_edge_type() : it.second.get_depend_schema().get_tag_id(); @@ -3439,12 +3465,13 @@ StatusOr MetaClient::getFTIndexByNameFromCache(GraphSpaceID space if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - if (threadLocalInfo.fulltextIndexMap_.find(name) != fulltextIndexMap_.end() && - threadLocalInfo.fulltextIndexMap_.at(name).get_space_id() != spaceId) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + if (metadata.fulltextIndexMap_.find(name) != fulltextIndexMap_.end() && + metadata.fulltextIndexMap_.at(name).get_space_id() != spaceId) { return Status::IndexNotFound(); } - return threadLocalInfo.fulltextIndexMap_.at(name); + return metadata.fulltextIndexMap_.at(name); } folly::Future> MetaClient::createSession( @@ -3596,22 +3623,16 @@ bool MetaClient::loadSessions() { LOG(ERROR) << "List sessions failed, status:" << session_list.status(); return false; } - SessionMap* oldSessionMap = sessionMap_.load(); - SessionMap* newSessionMap = new SessionMap(*oldSessionMap); - auto oldKilledPlan = killedPlans_.load(); - auto newKilledPlan = new folly::F14FastSet>(*oldKilledPlan); + sessionMap_.clear(); + killedPlans_.clear(); for (auto& session : session_list.value().get_sessions()) { - (*newSessionMap)[session.get_session_id()] = session; + sessionMap_[session.get_session_id()] = session; for (auto& query : session.get_queries()) { if (query.second.get_status() == cpp2::QueryStatus::KILLING) { - newKilledPlan->insert({session.get_session_id(), query.first}); + killedPlans_.insert({session.get_session_id(), query.first}); } } } - sessionMap_.store(newSessionMap); - killedPlans_.store(newKilledPlan); - folly::rcu_retire(oldKilledPlan); - folly::rcu_retire(oldSessionMap); return true; } @@ -3620,9 +3641,9 @@ StatusOr MetaClient::getSessionFromCache(const nebula::SessionID& return Status::Error("Not ready!"); } folly::rcu_reader guard; - auto session_map = sessionMap_.load(); - auto it = session_map->find(session_id); - if (it != session_map->end()) { + auto& sessionMap = metadata_.load()->sessionMap_; + auto it = sessionMap.find(session_id); + if (it != sessionMap.end()) { return it->second; } return Status::SessionNotFound(); @@ -3636,7 +3657,7 @@ bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId) return false; } folly::rcu_reader guard; - return killedPlans_.load()->count({sessionId, planId}); + return metadata_.load()->killedPlans_.count({sessionId, planId}); } Status MetaClient::verifyVersion() { diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index b1b41f7df13..c5ed3474aac 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -784,7 +784,8 @@ class MetaClient { // Only report dir info once when started bool dirInfoReported_ = false; - struct ThreadLocalInfo { + + struct MetaData { int64_t localLastUpdateTime_{-2}; LocalCache localCache_; SpaceNameIdMap spaceIndexByName_; @@ -800,9 +801,12 @@ class MetaClient { std::vector storageHosts_; FTIndexMap fulltextIndexMap_; UserPasswordMap userPasswordMap_; - }; - const ThreadLocalInfo& getThreadLocalInfo(); + SessionMap sessionMap_; + folly::F14FastSet> killedPlans_; + + ServiceClientsList serviceClientList_; + }; void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col, ObjectPool* pool); @@ -829,7 +833,6 @@ class MetaClient { ServiceClientsList serviceClientList_; FTIndexMap fulltextIndexMap_; - mutable folly::RWSpinLock localCacheLock_; // The listener_ is the NebulaStore MetaChangedListener* listener_{nullptr}; // The lock used to protect listener_ @@ -847,8 +850,9 @@ class MetaClient { MetaClientOptions options_; std::vector storageHosts_; int64_t heartbeatTime_; - std::atomic sessionMap_; - std::atomic>*> killedPlans_; + SessionMap sessionMap_; + folly::F14FastSet> killedPlans_; + std::atomic metadata_; }; } // namespace meta diff --git a/src/storage/test/ChainTestUtils.h b/src/storage/test/ChainTestUtils.h index d94f30b2a74..0fd04ca00ee 100644 --- a/src/storage/test/ChainTestUtils.h +++ b/src/storage/test/ChainTestUtils.h @@ -226,14 +226,14 @@ class MetaClientTestUpdater { static void addLocalCache(meta::MetaClient& mClient, GraphSpaceID spaceId, std::shared_ptr spInfoCache) { - mClient.localCache_[spaceId] = spInfoCache; + mClient.metadata_.load()->localCache_[spaceId] = spInfoCache; } static meta::SpaceInfoCache* getLocalCache(meta::MetaClient* mClient, GraphSpaceID spaceId) { - if (mClient->localCache_.count(spaceId) == 0) { + if (mClient->metadata_.load()->localCache_.count(spaceId) == 0) { return nullptr; } - return mClient->localCache_[spaceId].get(); + return mClient->metadata_.load()->localCache_[spaceId].get(); } static void addPartTerm(meta::MetaClient* mClient, diff --git a/src/storage/test/KillQueryTest.cpp b/src/storage/test/KillQueryTest.cpp index 815ddad74a5..8516244c5bb 100644 --- a/src/storage/test/KillQueryTest.cpp +++ b/src/storage/test/KillQueryTest.cpp @@ -19,7 +19,7 @@ class KillQueryMetaWrapper { public: explicit KillQueryMetaWrapper(MetaClient* client) : client_(client) {} void killQuery(SessionID session_id, ExecutionPlanID plan_id) { - client_->killedPlans_.load()->emplace(session_id, plan_id); + client_->metadata_.load()->killedPlans_.emplace(session_id, plan_id); } private: