Skip to content

Commit

Permalink
Merge branch 'master' into fix-cluster-br
Browse files Browse the repository at this point in the history
  • Loading branch information
Sophie-Xie authored Dec 30, 2021
2 parents b80ec9d + 321f549 commit 60cc1a0
Show file tree
Hide file tree
Showing 129 changed files with 3,407 additions and 1,240 deletions.
54 changes: 33 additions & 21 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ bool MetaClient::loadData() {
return false;
}

if (!loadFulltextClients()) {
LOG(ERROR) << "Load fulltext services Failed";
if (!loadGlobalServiceClients()) {
LOG(ERROR) << "Load global services Failed";
return false;
}

Expand Down Expand Up @@ -519,15 +519,15 @@ bool MetaClient::loadListeners(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCa
return true;
}

bool MetaClient::loadFulltextClients() {
auto ftRet = listFTClients().get();
if (!ftRet.ok()) {
LOG(ERROR) << "List fulltext services failed, status:" << ftRet.status();
bool MetaClient::loadGlobalServiceClients() {
auto ret = listServiceClients(cpp2::ExternalServiceType::ELASTICSEARCH).get();
if (!ret.ok()) {
LOG(ERROR) << "List services failed, status:" << ret.status();
return false;
}
{
folly::RWSpinLock::WriteHolder holder(localCacheLock_);
fulltextClientList_ = std::move(ftRet).value();
serviceClientList_ = std::move(ret).value();
}
return true;
}
Expand Down Expand Up @@ -3277,16 +3277,16 @@ folly::Future<StatusOr<nebula::cpp2::ErrorCode>> MetaClient::reportTaskFinish(
return fut;
}

folly::Future<StatusOr<bool>> MetaClient::signInFTService(
cpp2::FTServiceType type, const std::vector<cpp2::FTClient>& clients) {
cpp2::SignInFTServiceReq req;
folly::Future<StatusOr<bool>> MetaClient::signInService(
const cpp2::ExternalServiceType& type, const std::vector<cpp2::ServiceClient>& clients) {
cpp2::SignInServiceReq req;
req.type_ref() = type;
req.clients_ref() = clients;
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_signInFTService(request); },
[](auto client, auto request) { return client->future_signInService(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
Expand All @@ -3295,13 +3295,14 @@ folly::Future<StatusOr<bool>> MetaClient::signInFTService(
return future;
}

folly::Future<StatusOr<bool>> MetaClient::signOutFTService() {
cpp2::SignOutFTServiceReq req;
folly::Future<StatusOr<bool>> MetaClient::signOutService(const cpp2::ExternalServiceType& type) {
cpp2::SignOutServiceReq req;
req.type_ref() = type;
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_signOutFTService(request); },
[](auto client, auto request) { return client->future_signOutService(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
Expand All @@ -3310,25 +3311,36 @@ folly::Future<StatusOr<bool>> MetaClient::signOutFTService() {
return future;
}

folly::Future<StatusOr<std::vector<cpp2::FTClient>>> MetaClient::listFTClients() {
cpp2::ListFTClientsReq req;
folly::Promise<StatusOr<std::vector<cpp2::FTClient>>> promise;
folly::Future<StatusOr<ServiceClientsList>> MetaClient::listServiceClients(
const cpp2::ExternalServiceType& type) {
cpp2::ListServiceClientsReq req;
req.type_ref() = type;
folly::Promise<StatusOr<ServiceClientsList>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_listFTClients(request); },
[](cpp2::ListFTClientsResp&& resp) -> decltype(auto) {
[](auto client, auto request) { return client->future_listServiceClients(request); },
[](cpp2::ListServiceClientsResp&& resp) -> decltype(auto) {
return std::move(resp).get_clients();
},
std::move(promise));
return future;
}

StatusOr<std::vector<cpp2::FTClient>> MetaClient::getFTClientsFromCache() {
StatusOr<std::vector<cpp2::ServiceClient>> MetaClient::getServiceClientsFromCache(
const cpp2::ExternalServiceType& type) {
if (!ready_) {
return Status::Error("Not ready!");
}
return fulltextClientList_;

folly::RWSpinLock::ReadHolder holder(localCacheLock_);
if (type == cpp2::ExternalServiceType::ELASTICSEARCH) {
auto sIter = serviceClientList_.find(type);
if (sIter != serviceClientList_.end()) {
return sIter->second;
}
}
return Status::Error("Service not found!");
}

folly::Future<StatusOr<bool>> MetaClient::createFTIndex(const std::string& name,
Expand Down
27 changes: 16 additions & 11 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ using Indexes = std::unordered_map<IndexID, std::shared_ptr<cpp2::IndexItem>>;
using Listeners =
std::unordered_map<HostAddr, std::vector<std::pair<PartitionID, cpp2::ListenerType>>>;

// Get services
using ServiceClientsList =
std::unordered_map<cpp2::ExternalServiceType, std::vector<cpp2::ServiceClient>>;

struct SpaceInfoCache {
cpp2::SpaceDesc spaceDesc_;
PartsAlloc partsAlloc_;
Expand Down Expand Up @@ -144,9 +148,6 @@ using UserPasswordMap = std::unordered_map<std::string, std::string>;
using MetaConfigMap =
std::unordered_map<std::pair<cpp2::ConfigModule, std::string>, cpp2::ConfigItem>;

// get fulltext services
using FulltextClientsList = std::vector<cpp2::FTClient>;

using FTIndexMap = std::unordered_map<std::string, cpp2::FTIndex>;

using SessionMap = std::unordered_map<SessionID, cpp2::Session>;
Expand Down Expand Up @@ -447,15 +448,17 @@ class MetaClient {
StatusOr<std::vector<RemoteListenerInfo>> getListenerHostTypeBySpacePartType(GraphSpaceID spaceId,
PartitionID partId);

// Operations for fulltext services
folly::Future<StatusOr<bool>> signInFTService(cpp2::FTServiceType type,
const std::vector<cpp2::FTClient>& clients);
// Operations for services
folly::Future<StatusOr<bool>> signInService(const cpp2::ExternalServiceType& type,
const std::vector<cpp2::ServiceClient>& clients);

folly::Future<StatusOr<bool>> signOutFTService();
folly::Future<StatusOr<bool>> signOutService(const cpp2::ExternalServiceType& type);

folly::Future<StatusOr<std::vector<cpp2::FTClient>>> listFTClients();
folly::Future<StatusOr<ServiceClientsList>> listServiceClients(
const cpp2::ExternalServiceType& type);

StatusOr<std::vector<cpp2::FTClient>> getFTClientsFromCache();
StatusOr<std::vector<cpp2::ServiceClient>> getServiceClientsFromCache(
const cpp2::ExternalServiceType& type);

// Operations for fulltext index.

Expand Down Expand Up @@ -682,7 +685,7 @@ class MetaClient {

bool loadListeners(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCache> cache);

bool loadFulltextClients();
bool loadGlobalServiceClients();

bool loadFulltextIndexes();

Expand Down Expand Up @@ -815,7 +818,9 @@ class MetaClient {

NameIndexMap tagNameIndexMap_;
NameIndexMap edgeNameIndexMap_;
FulltextClientsList fulltextClientList_;

// Global service client
ServiceClientsList serviceClientList_;
FTIndexMap fulltextIndexMap_;

mutable folly::RWSpinLock localCacheLock_;
Expand Down
45 changes: 44 additions & 1 deletion src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
auto partId = directReq.get_parts().begin()->first;
auto optLeader = getLeader(directReq.get_space_id(), partId);
if (!optLeader.ok()) {
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId);
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId)
<< optLeader.status();
p.setValue(::nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
return;
}
Expand Down Expand Up @@ -131,5 +132,47 @@ cpp2::ChainAddEdgesRequest InternalStorageClient::makeChainAddReq(const cpp2::Ad
return ret;
}

void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
const std::string& txnId,
TermID termId,
folly::Promise<nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb) {
auto spaceId = req.get_space_id();
auto partId = req.get_parts().begin()->first;
auto optLeader = getLeader(req.get_space_id(), partId);
if (!optLeader.ok()) {
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId)
<< optLeader.status();
p.setValue(::nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
return;
}
HostAddr& leader = optLeader.value();
leader.port += kInternalPortOffset;
VLOG(2) << "leader host: " << leader;

cpp2::ChainDeleteEdgesRequest chainReq;
chainReq.space_id_ref() = req.get_space_id();
chainReq.parts_ref() = req.get_parts();
chainReq.txn_id_ref() = txnId;
chainReq.term_ref() = termId;
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainDeleteEdgesRequest& r) {
return client->future_chainDeleteEdges(r);
});

std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainDeleteEdges(req, txnId, termId, std::move(p));
} else {
p.setValue(code);
}
return;
});
}

} // namespace storage
} // namespace nebula
6 changes: 6 additions & 0 deletions src/clients/storage/InternalStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ class InternalStorageClient
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb = nullptr);

virtual void chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
const std::string& txnId,
TermID termId,
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb = nullptr);

private:
cpp2::ChainAddEdgesRequest makeChainAddReq(const cpp2::AddEdgesRequest& req,
TermID termId,
Expand Down
6 changes: 4 additions & 2 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,10 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::deleteEdges(

return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) {
return client->future_deleteEdges(r);
[useToss = param.useExperimentalFeature](
ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) {
return useToss ? client->future_chainDeleteEdges(r)
: client->future_deleteEdges(r);
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/SchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class SchemaManager {
// get all latest version of all edge schema
virtual StatusOr<EdgeSchema> getAllLatestVerEdgeSchema(GraphSpaceID space) = 0;

virtual StatusOr<std::vector<nebula::meta::cpp2::FTClient>> getFTClients() = 0;
virtual StatusOr<std::vector<nebula::meta::cpp2::ServiceClient>> getServiceClients(
cpp2::ExternalServiceType type) = 0;

// Get the TagID or EdgeType by the name.
// The first one is a bool which is used to distinguish the type.
Expand Down
7 changes: 4 additions & 3 deletions src/common/meta/ServerBasedSchemaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ StatusOr<EdgeSchema> ServerBasedSchemaManager::getAllLatestVerEdgeSchema(GraphSp
return metaClient_->getAllLatestVerEdgeSchemaFromCache(space);
}

StatusOr<std::vector<nebula::meta::cpp2::FTClient>> ServerBasedSchemaManager::getFTClients() {
auto ret = metaClient_->getFTClientsFromCache();
StatusOr<std::vector<nebula::meta::cpp2::ServiceClient>>
ServerBasedSchemaManager::getServiceClients(meta::cpp2::ExternalServiceType type) {
auto ret = metaClient_->getServiceClientsFromCache(type);
if (!ret.ok()) {
return ret.status();
}
if (ret.value().empty()) {
return Status::Error("fulltext client list is empty");
return Status::Error("Service list is empty");
}
return std::move(ret).value();
}
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/ServerBasedSchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class ServerBasedSchemaManager : public SchemaManager {
// get all latest version of all edges
StatusOr<EdgeSchema> getAllLatestVerEdgeSchema(GraphSpaceID space) override;

StatusOr<std::vector<nebula::meta::cpp2::FTClient>> getFTClients() override;
StatusOr<std::vector<nebula::meta::cpp2::ServiceClient>> getServiceClients(
cpp2::ExternalServiceType type) override;

StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
GraphSpaceID spaceId, int32_t schemaId) override;
Expand Down
38 changes: 22 additions & 16 deletions src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static const std::unordered_map<std::string, std::pair<std::string, bool>> syste
{"configs", {"__configs__", true}},
{"groups", {"__groups__", true}},
{"zones", {"__zones__", true}},
{"ft_service", {"__ft_service__", false}},
{"services", {"__services__", false}},
{"sessions", {"__sessions__", true}}};

// SystemInfo will always be backed up
Expand Down Expand Up @@ -89,7 +89,7 @@ static const std::string kBalancePlanTable = tableMaps.at("balance_plan").fir
static const std::string kLocalIdTable = tableMaps.at("local_id").first; // NOLINT

const std::string kFTIndexTable = tableMaps.at("ft_index").first; // NOLINT
const std::string kFTServiceTable = systemTableMaps.at("ft_service").first; // NOLINT
const std::string kServicesTable = systemTableMaps.at("services").first; // NOLINT
const std::string kSessionsTable = systemTableMaps.at("sessions").first; // NOLINT

const std::string kIdKey = systemInfoMaps.at("autoIncrementId").first; // NOLINT
Expand Down Expand Up @@ -1155,27 +1155,33 @@ const std::string& MetaKeyUtils::statsKeyPrefix() {
return kStatsTable;
}

std::string MetaKeyUtils::fulltextServiceKey() {
std::string MetaKeyUtils::serviceKey(const meta::cpp2::ExternalServiceType& type) {
std::string key;
key.reserve(kFTServiceTable.size());
key.append(kFTServiceTable.data(), kFTServiceTable.size());
key.reserve(kServicesTable.size() + sizeof(meta::cpp2::ExternalServiceType));
key.append(kServicesTable.data(), kServicesTable.size())
.append(reinterpret_cast<const char*>(&type), sizeof(meta::cpp2::ExternalServiceType));
return key;
}

std::string MetaKeyUtils::fulltextServiceVal(meta::cpp2::FTServiceType type,
const std::vector<meta::cpp2::FTClient>& clients) {
std::string val, cval;
apache::thrift::CompactSerializer::serialize(clients, &cval);
val.reserve(sizeof(meta::cpp2::FTServiceType) + cval.size());
val.append(reinterpret_cast<const char*>(&type), sizeof(meta::cpp2::FTServiceType)).append(cval);
const std::string& MetaKeyUtils::servicePrefix() {
return kServicesTable;
}

meta::cpp2::ExternalServiceType MetaKeyUtils::parseServiceType(folly::StringPiece rawData) {
auto offset = kServicesTable.size();
return *reinterpret_cast<const meta::cpp2::ExternalServiceType*>(rawData.data() + offset);
}

std::string MetaKeyUtils::serviceVal(const std::vector<meta::cpp2::ServiceClient>& clients) {
std::string val;
apache::thrift::CompactSerializer::serialize(clients, &val);
return val;
}

std::vector<meta::cpp2::FTClient> MetaKeyUtils::parseFTClients(folly::StringPiece rawData) {
std::vector<meta::cpp2::FTClient> clients;
int32_t offset = sizeof(meta::cpp2::FTServiceType);
auto clientsRaw = rawData.subpiece(offset, rawData.size() - offset);
apache::thrift::CompactSerializer::deserialize(clientsRaw, clients);
std::vector<meta::cpp2::ServiceClient> MetaKeyUtils::parseServiceClients(
folly::StringPiece rawData) {
std::vector<meta::cpp2::ServiceClient> clients;
apache::thrift::CompactSerializer::deserialize(rawData, clients);
return clients;
}

Expand Down
11 changes: 7 additions & 4 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,15 @@ class MetaKeyUtils final {

static GraphSpaceID parseStatsSpace(folly::StringPiece rawData);

static std::string fulltextServiceKey();
static std::string serviceKey(const meta::cpp2::ExternalServiceType& type);

static std::string fulltextServiceVal(meta::cpp2::FTServiceType type,
const std::vector<meta::cpp2::FTClient>& clients);
static std::string serviceVal(const std::vector<meta::cpp2::ServiceClient>& client);

static std::vector<meta::cpp2::FTClient> parseFTClients(folly::StringPiece rawData);
static const std::string& servicePrefix();

static meta::cpp2::ExternalServiceType parseServiceType(folly::StringPiece rawData);

static std::vector<meta::cpp2::ServiceClient> parseServiceClients(folly::StringPiece rawData);

static const std::string& sessionPrefix();

Expand Down
2 changes: 2 additions & 0 deletions src/common/utils/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ enum class NebulaKeyType : uint32_t {
kOperation = 0x00000005,
kKeyValue = 0x00000006,
kVertex = 0x00000007,
kPrime = 0x00000008, // used in TOSS, if we write a lock succeed
kDoublePrime = 0x00000009, // used in TOSS, if we get RPC back from remote.
};

enum class NebulaSystemKeyType : uint32_t {
Expand Down
Loading

0 comments on commit 60cc1a0

Please sign in to comment.