Skip to content

Commit

Permalink
Merge branch 'master' into test_upload_disk
Browse files Browse the repository at this point in the history
  • Loading branch information
Nivras authored Dec 7, 2021
2 parents 378251e + 6300233 commit 26f1202
Show file tree
Hide file tree
Showing 16 changed files with 269 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2394,7 +2394,6 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
req.set_host(options_.localHost_);
req.set_role(options_.role_);
req.set_git_info_sha(options_.gitInfoSHA_);
req.set_version(getOriginVersion());
if (options_.role_ == cpp2::HostRole::STORAGE) {
if (options_.clusterId_.load() == 0) {
options_.clusterId_ = FileBasedClusterIdMan::getClusterIdFromFile(FLAGS_cluster_id_path);
Expand Down Expand Up @@ -3505,6 +3504,7 @@ bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId)

Status MetaClient::verifyVersion() {
auto req = cpp2::VerifyClientVersionReq();
req.set_host(options_.localHost_);
folly::Promise<StatusOr<cpp2::VerifyClientVersionResp>> promise;
auto future = promise.getFuture();
getResponse(
Expand Down
22 changes: 22 additions & 0 deletions src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace nebula {
static const std::unordered_map<std::string, std::pair<std::string, bool>> systemTableMaps = {
{"users", {"__users__", true}},
{"hosts", {"__hosts__", false}},
{"versions", {"__versions__", false}},
{"snapshots", {"__snapshots__", false}},
{"configs", {"__configs__", true}},
{"groups", {"__groups__", true}},
Expand Down Expand Up @@ -58,6 +59,7 @@ static const std::unordered_map<
static const std::string kSpacesTable = tableMaps.at("spaces").first; // NOLINT
static const std::string kPartsTable = tableMaps.at("parts").first; // NOLINT
static const std::string kHostsTable = systemTableMaps.at("hosts").first; // NOLINT
static const std::string kVersionsTable = systemTableMaps.at("versions").first; // NOLINT
static const std::string kTagsTable = tableMaps.at("tags").first; // NOLINT
static const std::string kEdgesTable = tableMaps.at("edges").first; // NOLINT
static const std::string kIndexesTable = tableMaps.at("indexes").first; // NOLINT
Expand Down Expand Up @@ -269,6 +271,26 @@ HostAddr MetaKeyUtils::parseHostKeyV2(folly::StringPiece key) {
return MetaKeyUtils::deserializeHostAddr(key);
}

std::string MetaKeyUtils::versionKey(const HostAddr& h) {
std::string key;
key.append(kVersionsTable.data(), kVersionsTable.size())
.append(MetaKeyUtils::serializeHostAddr(h));
return key;
}

std::string MetaKeyUtils::versionVal(const std::string& version) {
std::string val;
auto versionLen = version.size();
val.reserve(sizeof(int64_t) + versionLen);
val.append(reinterpret_cast<const char*>(&version), sizeof(int64_t)).append(version);
return val;
}

std::string MetaKeyUtils::parseVersion(folly::StringPiece val) {
auto len = *reinterpret_cast<const size_t*>(val.data());
return val.subpiece(sizeof(size_t), len).str();
}

std::string MetaKeyUtils::leaderKey(std::string addr, Port port) {
LOG(ERROR) << "deprecated function\n" << boost::stacktrace::stacktrace();
return leaderKeyV2(addr, port);
Expand Down
6 changes: 6 additions & 0 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ class MetaKeyUtils final {

static HostAddr parseHostKeyV2(folly::StringPiece key);

static std::string versionKey(const HostAddr& h);

static std::string versionVal(const std::string& version);

static std::string parseVersion(folly::StringPiece val);

static std::string leaderKey(std::string ip, Port port);

static std::string leaderKeyV2(std::string addr, Port port);
Expand Down
5 changes: 2 additions & 3 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -565,9 +565,7 @@ struct HBReq {
4: optional map<common.GraphSpaceID, list<LeaderInfo>>
(cpp.template = "std::unordered_map") leader_partIds;
5: binary git_info_sha,
// version of binary
6: optional binary version,
7: optional map<common.GraphSpaceID, map<binary, PartitionList>
6: optional map<common.GraphSpaceID, map<binary, PartitionList>
(cpp.template = "std::unordered_map")>
(cpp.template = "std::unordered_map") disk_parts;
}
Expand Down Expand Up @@ -1113,6 +1111,7 @@ struct VerifyClientVersionResp {

struct VerifyClientVersionReq {
1: required binary version = common.version;
2: common.HostAddr host;
}

service MetaService {
Expand Down
22 changes: 0 additions & 22 deletions src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ struct HostInfo {
int64_t lastHBTimeInMilliSec_ = 0;
cpp2::HostRole role_{cpp2::HostRole::UNKNOWN};
std::string gitInfoSha_;
// version of binary
folly::Optional<std::string> version_;

static HostInfo decode(const folly::StringPiece& data) {
if (data.size() == sizeof(int64_t)) {
Expand Down Expand Up @@ -71,12 +69,6 @@ struct HostInfo {
if (!info.gitInfoSha_.empty()) {
encode.append(info.gitInfoSha_.data(), len);
}

if (info.version_.has_value()) {
len = info.version_.value().size();
encode.append(reinterpret_cast<const char*>(&len), sizeof(std::size_t));
encode.append(info.version_.value().data(), len);
}
return encode;
}

Expand Down Expand Up @@ -104,20 +96,6 @@ struct HostInfo {
}

info.gitInfoSha_ = std::string(data.data() + offset, len);
offset += len;

if (offset == data.size()) {
return info;
}

len = *reinterpret_cast<const size_t*>(data.data() + offset);
offset += sizeof(size_t);

if (offset + len > data.size()) {
FLOG_FATAL("decode out of range, offset=%zu, actual=%zu", offset, data.size());
}

info.version_ = std::string(data.data() + offset, len);
return info;
}
};
Expand Down
3 changes: 0 additions & 3 deletions src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ void HBProcessor::process(const cpp2::HBReq& req) {
}

HostInfo info(time::WallClock::fastNowInMilliSec(), req.get_role(), req.get_git_info_sha());
if (req.version_ref().has_value()) {
info.version_ = *req.version_ref();
}
if (req.leader_partIds_ref().has_value()) {
ret = ActiveHostsMan::updateHostInfo(kvstore_, host, info, &*req.leader_partIds_ref());
} else {
Expand Down
6 changes: 6 additions & 0 deletions src/meta/processors/admin/VerifyClientVersionProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ void VerifyClientVersionProcessor::process(const cpp2::VerifyClientVersionReq& r
req.get_version().c_str(),
FLAGS_client_white_list.c_str()));
} else {
auto host = req.get_host();
auto versionKey = MetaKeyUtils::versionKey(host);
auto versionVal = MetaKeyUtils::versionVal(req.get_version().c_str());
std::vector<kvstore::KV> versionData;
versionData.emplace_back(std::move(versionKey), std::move(versionVal));
doSyncPut(versionData);
resp_.set_code(nebula::cpp2::ErrorCode::SUCCEEDED);
}
onFinished();
Expand Down
21 changes: 21 additions & 0 deletions src/meta/processors/index/DropEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,27 @@ void DropEdgeIndexProcessor::process(const cpp2::DropEdgeIndexReq& req) {
keys.emplace_back(MetaKeyUtils::indexIndexKey(spaceID, indexName));
keys.emplace_back(MetaKeyUtils::indexKey(spaceID, edgeIndexID));

auto indexItemRet = doGet(keys.back());
if (!nebula::ok(indexItemRet)) {
auto retCode = nebula::error(indexItemRet);
if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
retCode = nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND;
}
LOG(ERROR) << "Drop Edge Index Failed: SpaceID " << spaceID << " Index Name: " << indexName
<< " error: " << apache::thrift::util::enumNameSafe(retCode);
handleErrorCode(retCode);
onFinished();
return;
}

auto item = MetaKeyUtils::parseIndex(nebula::value(indexItemRet));
if (item.get_schema_id().getType() != nebula::cpp2::SchemaID::Type::edge_type) {
LOG(ERROR) << "Drop Edge Index Failed: Index Name " << indexName << " is not EdgeIndex";
resp_.set_code(nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND);
onFinished();
return;
}

LOG(INFO) << "Drop Edge Index " << indexName;
resp_.set_id(to(edgeIndexID, EntryType::INDEX));
doSyncMultiRemoveAndUpdate(std::move(keys));
Expand Down
21 changes: 21 additions & 0 deletions src/meta/processors/index/DropTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,27 @@ void DropTagIndexProcessor::process(const cpp2::DropTagIndexReq& req) {
keys.emplace_back(MetaKeyUtils::indexIndexKey(spaceID, indexName));
keys.emplace_back(MetaKeyUtils::indexKey(spaceID, tagIndexID));

auto indexItemRet = doGet(keys.back());
if (!nebula::ok(indexItemRet)) {
auto retCode = nebula::error(indexItemRet);
if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
retCode = nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND;
}
LOG(ERROR) << "Drop Tag Index Failed: SpaceID " << spaceID << " Index Name: " << indexName
<< " error: " << apache::thrift::util::enumNameSafe(retCode);
handleErrorCode(retCode);
onFinished();
return;
}

auto item = MetaKeyUtils::parseIndex(nebula::value(indexItemRet));
if (item.get_schema_id().getType() != nebula::cpp2::SchemaID::Type::tag_id) {
LOG(ERROR) << "Drop Tag Index Failed: Index Name " << indexName << " is not TagIndex";
resp_.set_code(nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND);
onFinished();
return;
}

LOG(INFO) << "Drop Tag Index " << indexName;
resp_.set_id(to(tagIndexID, EntryType::INDEX));
doSyncMultiRemoveAndUpdate(std::move(keys));
Expand Down
9 changes: 7 additions & 2 deletions src/meta/processors/parts/ListHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,14 @@ nebula::cpp2::ErrorCode ListHostsProcessor::allHostsWithStatus(cpp2::HostRole ro

item.set_role(info.role_);
item.set_git_info_sha(info.gitInfoSha_);
if (info.version_.has_value()) {
item.set_version(info.version_.value());

auto versionKey = MetaKeyUtils::versionKey(item.get_hostAddr());
auto versionRet = doGet(versionKey);
if (nebula::ok(versionRet)) {
auto versionVal = MetaKeyUtils::parseVersion(value(versionRet));
item.set_version(versionVal);
}

if (now - info.lastHBTimeInMilliSec_ < FLAGS_removed_threshold_sec * 1000) {
// If meta didn't receive heartbeat with 2 periods, regard hosts as
// offline. Same as ActiveHostsMan::getActiveHosts
Expand Down
15 changes: 15 additions & 0 deletions src/meta/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,18 @@ nebula_add_test(
wangle
gtest
)

nebula_add_test(
NAME
verify_client_version_test
SOURCES
VerifyClientVersionTest.cpp
OBJECTS
${meta_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
${THRIFT_LIBRARIES}
${PROXYGEN_LIBRARIES}
wangle
gtest
)
75 changes: 75 additions & 0 deletions src/meta/test/VerifyClientVersionTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include <gtest/gtest.h>

#include "common/base/Base.h"
#include "common/fs/TempDir.h"
#include "meta/processors/admin/HBProcessor.h"
#include "meta/processors/admin/VerifyClientVersionProcessor.h"
#include "meta/processors/parts/ListHostsProcessor.h"
#include "meta/test/TestUtils.h"

namespace nebula {
namespace meta {

TEST(VerifyClientVersionTest, VersionTest) {
fs::TempDir rootPath("/tmp/VersionTest.XXXXXX");
std::unique_ptr<kvstore::KVStore> kv(MockCluster::initMetaKV(rootPath.path()));
{
auto req = cpp2::VerifyClientVersionReq();
req.set_version("1.0.1");
auto* processor = VerifyClientVersionProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CLIENT_SERVER_INCOMPATIBLE, resp.get_code());
}
{
for (auto i = 0; i < 5; i++) {
auto req = cpp2::VerifyClientVersionReq();
req.set_host(HostAddr(std::to_string(i), i));
auto* processor = VerifyClientVersionProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
}
{
const ClusterID kClusterId = 10;
for (auto i = 0; i < 5; i++) {
auto req = cpp2::HBReq();
req.set_role(cpp2::HostRole::GRAPH);
req.set_host(HostAddr(std::to_string(i), i));
req.set_cluster_id(kClusterId);
auto* processor = HBProcessor::instance(kv.get(), nullptr, kClusterId);
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
}
{
auto req = cpp2::ListHostsReq();
req.set_type(cpp2::ListHostType::GRAPH);
auto* processor = ListHostsProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
ASSERT_EQ(resp.get_hosts().size(), 5);
}
}

} // namespace meta
} // namespace nebula

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, true);
google::SetStderrLogging(google::INFO);
return RUN_ALL_TESTS();
}
3 changes: 2 additions & 1 deletion src/parser/parser.yy
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ static constexpr size_t kCommentLengthLimit = 256;
%token <boolval> BOOL
%token <intval> INTEGER
%token <doubleval> DOUBLE
%token <strval> STRING VARIABLE LABEL IPV4
%token <strval> STRING VARIABLE LABEL IPV4 CHINESE_LABEL

%type <strval> name_label unreserved_keyword predicate_name
%type <expr> expression
Expand Down Expand Up @@ -406,6 +406,7 @@ static constexpr size_t kCommentLengthLimit = 256;

name_label
: LABEL { $$ = $1; }
| CHINESE_LABEL { $$ = $1; }
| unreserved_keyword { $$ = $1; }
;

Expand Down
17 changes: 16 additions & 1 deletion src/parser/scanner.lex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ HEX ([0-9a-fA-F])
OCT ([0-7])
IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])


U [\x80-\xbf]
U2 [\xc2-\xdf]
U3 [\xe0-\xef]
U4 [\xf0-\xf4]
CHINESE_LABEL ({U2}{U}|{U3}{U}{U}|{U4}{U}{U}{U})+

%%

Expand Down Expand Up @@ -467,6 +471,17 @@ IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])
// Must match /* */
throw GraphParser::syntax_error(*yylloc, "unterminated comment");
}
\`{CHINESE_LABEL}\` {
yylval->strval = new std::string(yytext + 1, yyleng - 2);
if (yylval->strval->size() > MAX_STRING) {
auto error = "Out of range of the LABEL length, "
"the max length of LABEL is " +
std::to_string(MAX_STRING) + ":";
delete yylval->strval;
throw GraphParser::syntax_error(*yylloc, error);
}
return TokenType::CHINESE_LABEL;
}
. {
/**
* Any other unmatched byte sequences will get us here,
Expand Down
12 changes: 12 additions & 0 deletions tests/tck/features/index/TagEdgeIndex.feature
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ Feature: tag and edge index tests from pytest
Then the result should be, in any order:
| Tag Index Name | Create Tag Index |
| 'multi_tag_index' | 'CREATE TAG INDEX `multi_tag_index` ON `tag_1` (\n `col2`,\n `col3`\n)' |
# Check if check tag/edge type before drop index
When executing query:
"""
DROP EDGE INDEX multi_tag_index
"""
Then an ExecutionError should be raised at runtime.
When executing query:
"""
DROP TAG INDEX multi_tag_index
Expand Down Expand Up @@ -405,6 +411,12 @@ Feature: tag and edge index tests from pytest
Then the result should be, in any order:
| Edge Index Name | Create Edge Index |
| 'multi_edge_index' | 'CREATE EDGE INDEX `multi_edge_index` ON `edge_1` (\n `col2`,\n `col3`\n)' |
# Check if check tag/edge type before drop index
When executing query:
"""
DROP TAG INDEX multi_edge_index
"""
Then an ExecutionError should be raised at runtime.
# Check if show create edge index works well
When executing query:
"""
Expand Down
Loading

0 comments on commit 26f1202

Please sign in to comment.