Skip to content

Commit

Permalink
Support meta upgrade v3
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Dec 24, 2021
1 parent 63db177 commit df20526
Show file tree
Hide file tree
Showing 12 changed files with 420 additions and 27 deletions.
1 change: 1 addition & 0 deletions src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:meta_http_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:meta_v2_thrift_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
${common_deps}
${storage_meta_deps}
Expand Down
32 changes: 15 additions & 17 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,26 +140,24 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
LOG(ERROR) << "Meta version is invalid";
return nullptr;
} else if (version == nebula::meta::MetaVersion::V1) {
if (leader == localhost) {
LOG(INFO) << "I am leader, begin upgrade meta data";
// need to upgrade the v1.0 meta data format to v2.0 meta data format
auto ret = nebula::meta::MetaVersionMan::updateMetaV1ToV2(kvstore.get());
if (!ret.ok()) {
LOG(ERROR) << ret;
return nullptr;
}
} else {
LOG(INFO) << "I am follower, wait for leader to sync upgrade";
while (version != nebula::meta::MetaVersion::V2) {
VLOG(1) << "Waiting for leader to upgrade";
sleep(1);
version = nebula::meta::MetaVersionMan::getMetaVersionFromKV(kvstore.get());
}
// need to upgrade the v1.0 meta data format to v2.0
auto ret = nebula::meta::MetaVersionMan::updateMetaV1ToV2(kvstore.get());
if (!ret.ok()) {
LOG(ERROR) << ret;
return nullptr;
}
} else if (version == nebula::meta::MetaVersion::V2) {
// need to upgrade the v2.0 meta data format to v3.0
auto ret = nebula::meta::MetaVersionMan::updateMetaV2ToV3(kvstore.get());
if (!ret.ok()) {
LOG(ERROR) << ret;
return nullptr;
}
}

if (leader == localhost) {
nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get());
if (!nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get())) {
LOG(ERROR) << "Meta Version save failed";
return nullptr;
}

LOG(INFO) << "Nebula store init succeeded, clusterId " << gClusterId;
Expand Down
2 changes: 2 additions & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,14 @@ nebula_add_library(
add_dependencies(
meta_version_man_obj
meta_v1_thrift_obj
meta_v2_thrift_obj
)

set(meta_test_deps
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v2_thrift_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
Expand Down
92 changes: 86 additions & 6 deletions src/meta/MetaVersionMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "meta/processors/job/JobUtils.h"
#include "meta/upgrade/MetaDataUpgrade.h"
#include "meta/upgrade/v1/MetaServiceUtilsV1.h"
#include "meta/upgrade/v2/MetaServiceUtilsV2.h"

DEFINE_bool(null_type, true, "set schema to support null type");
DEFINE_bool(print_info, false, "enable to print the rewrite data");
Expand Down Expand Up @@ -51,10 +52,10 @@ MetaVersion MetaVersionMan::getVersionByHost(kvstore::KVStore* kv) {
// static
bool MetaVersionMan::setMetaVersionToKV(kvstore::KVStore* kv) {
CHECK_NOTNULL(kv);
auto v2 = MetaVersion::V2;
auto v3 = MetaVersion::V3;
std::vector<kvstore::KV> data;
data.emplace_back(kMetaVersionKey,
std::string(reinterpret_cast<const char*>(&v2), sizeof(MetaVersion)));
std::string(reinterpret_cast<const char*>(&v3), sizeof(MetaVersion)));
bool ret = true;
folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(
Expand All @@ -63,7 +64,7 @@ bool MetaVersionMan::setMetaVersionToKV(kvstore::KVStore* kv) {
LOG(ERROR) << "Put failed, error: " << static_cast<int32_t>(code);
ret = false;
} else {
LOG(INFO) << "Write meta version 2 succeeds";
LOG(INFO) << "Write meta version 3 succeeds";
}
baton.post();
});
Expand All @@ -80,7 +81,28 @@ Status MetaVersionMan::updateMetaV1ToV2(kvstore::KVStore* kv) {
LOG(ERROR) << "Create snapshot failed: " << snapshot;
return Status::Error("Create snapshot failed");
}
auto status = doUpgrade(kv);
auto status = doUpgradeV1ToV2(kv);
if (!status.ok()) {
// rollback by snapshot
return status;
}
// delete snapshot file
auto dmRet = kv->dropCheckpoint(kDefaultSpaceId, snapshot);
if (dmRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Delete snapshot: " << snapshot << " failed, You need to delete it manually";
}
return Status::OK();
}

Status MetaVersionMan::updateMetaV2ToV3(kvstore::KVStore* kv) {
CHECK_NOTNULL(kv);
auto snapshot = folly::format("META_UPGRADE_SNAPSHOT_{}", MetaKeyUtils::genTimestampStr()).str();
auto meteRet = kv->createCheckpoint(kDefaultSpaceId, snapshot);
if (meteRet.isLeftType()) {
LOG(ERROR) << "Create snapshot failed: " << snapshot;
return Status::Error("Create snapshot failed");
}
auto status = doUpgradeV2ToV3(kv);
if (!status.ok()) {
// rollback by snapshot
return status;
Expand All @@ -94,7 +116,7 @@ Status MetaVersionMan::updateMetaV1ToV2(kvstore::KVStore* kv) {
}

// static
Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
MetaDataUpgrade upgrader(kv);
{
// kSpacesTable
Expand All @@ -105,7 +127,7 @@ Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
Status status = Status::OK();
while (iter->valid()) {
if (FLAGS_print_info) {
upgrader.printSpaces(iter->val());
upgrader.printSpacesV1(iter->val());
}
status = upgrader.rewriteSpaces(iter->key(), iter->val());
if (!status.ok()) {
Expand Down Expand Up @@ -158,6 +180,7 @@ Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
}
}
}

{
// kLeadersTable
auto prefix = nebula::meta::v1::kLeadersTable;
Expand All @@ -178,6 +201,7 @@ Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
}
}
}

{
// kTagsTable
auto prefix = nebula::meta::v1::kTagsTable;
Expand Down Expand Up @@ -334,5 +358,61 @@ Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
}
}

Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) {
MetaDataUpgrade upgrader(kv);
// Step 1: Upgrade HeartBeat into machine list
{
const auto& prefix = MetaKeyUtils::hostPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get active hosts failed";
return Status::Error("Get hosts failed");
}

std::vector<kvstore::KV> data;
while (iter->valid()) {
// Save the machine information
auto host = MetaKeyUtils::parseHostKey(iter->key());
auto machineKey = MetaKeyUtils::machineKey(host.host, host.port);
data.emplace_back(std::move(machineKey), "");

// Save the zone information
auto zoneName = folly::stringPrintf("default_zone_%s_%d", host.host.c_str(), host.port);
auto zoneKey = MetaKeyUtils::zoneKey(std::move(zoneName));
auto zoneVal = MetaKeyUtils::zoneVal({host});
data.emplace_back(std::move(zoneKey), std::move(zoneVal));
iter->next();
}
}

// Step 2: Update Create space properties about Group
{
const auto& prefix = MetaKeyUtils::spacePrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get spaces failed";
return Status::Error("Get spaces failed");
}

while (iter->valid()) {
if (FLAGS_print_info) {
upgrader.printSpacesV2(iter->val());
}
auto spaceProperties = meta::v2::MetaServiceUtilsV2::parseSpace(iter->val());
if (spaceProperties.group_name_ref().has_value()) {
Status status = upgrader.rewriteSpacesV2ToV3(iter->key(), iter->val());
if (!status.ok()) {
LOG(ERROR) << status;
return status;
}
}
iter->next();
}
}
return Status::OK();
}

} // namespace meta
} // namespace nebula
9 changes: 7 additions & 2 deletions src/meta/MetaVersionMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ enum class MetaVersion {
UNKNOWN = 0,
V1 = 1,
V2 = 2,
V3 = 3,
};

/**
Expand All @@ -32,13 +33,17 @@ class MetaVersionMan final {

static Status updateMetaV1ToV2(kvstore::KVStore* kv);

static Status updateMetaV2ToV3(kvstore::KVStore* kv);

private:
static MetaVersion getVersionByHost(kvstore::KVStore* kv);

static Status doUpgrade(kvstore::KVStore* kv);
static Status doUpgradeV1ToV2(kvstore::KVStore* kv);

static Status doUpgradeV2ToV3(kvstore::KVStore* kv);
};

} // namespace meta
} // namespace nebula

#endif // META_ROOTUSERMAN_H_
#endif // META_METAVERSIONMAN_H_
3 changes: 3 additions & 0 deletions src/meta/upgrade/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ nebula_add_library(
meta_data_upgrade_obj OBJECT
MetaDataUpgrade.cpp
v1/MetaServiceUtilsV1.cpp
v2/MetaServiceUtilsV2.cpp
)

add_dependencies(
meta_data_upgrade_obj
meta_v1_thrift_obj
meta_v2_thrift_obj
)

nebula_add_subdirectory(v1)
nebula_add_subdirectory(v2)
Loading

0 comments on commit df20526

Please sign in to comment.