Skip to content

Commit

Permalink
Support meta upgrade v3
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Dec 30, 2021
1 parent 426d816 commit d887631
Show file tree
Hide file tree
Showing 15 changed files with 527 additions and 33 deletions.
1 change: 1 addition & 0 deletions src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:meta_http_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:meta_v2_thrift_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
${common_deps}
${storage_meta_deps}
Expand Down
122 changes: 122 additions & 0 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,128 @@ static Status setupSignalHandler();
extern Status setupBreakpad();
#endif

namespace nebula {
namespace meta {
const std::string kClusterIdKey = "__meta_cluster_id_key__"; // NOLINT
} // namespace meta
} // namespace nebula

nebula::ClusterID gClusterId = 0;

std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> peers,
nebula::HostAddr localhost) {
auto partMan = std::make_unique<nebula::kvstore::MemPartManager>();
// The meta server has only one space (0), one part (0)
partMan->addPart(nebula::kDefaultSpaceId, nebula::kDefaultPartId, std::move(peers));
// folly IOThreadPoolExecutor
auto ioPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);
std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager(
apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager(
FLAGS_num_worker_threads, true /*stats*/));
threadManager->setNamePrefix("executor");
threadManager->start();
nebula::kvstore::KVOptions options;
options.dataPaths_ = {FLAGS_data_path};
options.partMan_ = std::move(partMan);
auto kvstore = std::make_unique<nebula::kvstore::NebulaStore>(
std::move(options), ioPool, localhost, threadManager);
if (!(kvstore->init())) {
LOG(ERROR) << "Nebula store init failed";
return nullptr;
}

LOG(INFO) << "Waiting for the leader elected...";
nebula::HostAddr leader;
while (true) {
auto ret = kvstore->partLeader(nebula::kDefaultSpaceId, nebula::kDefaultPartId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Nebula store init failed";
return nullptr;
}
leader = nebula::value(ret);
if (leader != nebula::HostAddr("", 0)) {
break;
}
LOG(INFO) << "Leader has not been elected, sleep 1s";
sleep(1);
}

gClusterId =
nebula::meta::ClusterIdMan::getClusterIdFromKV(kvstore.get(), nebula::meta::kClusterIdKey);
if (gClusterId == 0) {
if (leader == localhost) {
LOG(INFO) << "I am leader, create cluster Id";
gClusterId = nebula::meta::ClusterIdMan::create(FLAGS_meta_server_addrs);
if (!nebula::meta::ClusterIdMan::persistInKV(
kvstore.get(), nebula::meta::kClusterIdKey, gClusterId)) {
LOG(ERROR) << "Persist cluster failed!";
return nullptr;
}
} else {
LOG(INFO) << "I am follower, wait for the leader's clusterId";
while (gClusterId == 0) {
LOG(INFO) << "Waiting for the leader's clusterId";
sleep(1);
gClusterId = nebula::meta::ClusterIdMan::getClusterIdFromKV(kvstore.get(),
nebula::meta::kClusterIdKey);
}
}
}

auto version = nebula::meta::MetaVersionMan::getMetaVersionFromKV(kvstore.get());
LOG(INFO) << "Get meta version is " << static_cast<int32_t>(version);
if (version == nebula::meta::MetaVersion::UNKNOWN) {
LOG(ERROR) << "Meta version is invalid";
return nullptr;
} else if (version == nebula::meta::MetaVersion::V1) {
// need to upgrade the v1.0 meta data format to v2.0
auto ret = nebula::meta::MetaVersionMan::updateMetaV1ToV2(kvstore.get());
if (!ret.ok()) {
LOG(ERROR) << ret;
return nullptr;
}
} else if (version == nebula::meta::MetaVersion::V2) {
// need to upgrade the v2.0 meta data format to v3.0
auto ret = nebula::meta::MetaVersionMan::updateMetaV2ToV3(kvstore.get());
if (!ret.ok()) {
LOG(ERROR) << ret;
return nullptr;
}
}

if (!nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get())) {
LOG(ERROR) << "Meta Version save failed";
return nullptr;
}

LOG(INFO) << "Nebula store init succeeded, clusterId " << gClusterId;
return kvstore;
}

Status initWebService(nebula::WebService* svc,
nebula::kvstore::KVStore* kvstore,
nebula::hdfs::HdfsCommandHelper* helper,
nebula::thread::GenericThreadPool* pool) {
LOG(INFO) << "Starting Meta HTTP Service";
auto& router = svc->router();
router.get("/download-dispatch").handler([kvstore, helper, pool](PathParams&&) {
auto handler = new nebula::meta::MetaHttpDownloadHandler();
handler->init(kvstore, helper, pool);
return handler;
});
router.get("/ingest-dispatch").handler([kvstore, pool](PathParams&&) {
auto handler = new nebula::meta::MetaHttpIngestHandler();
handler->init(kvstore, pool);
return handler;
});
router.get("/replace").handler([kvstore](PathParams&&) {
auto handler = new nebula::meta::MetaHttpReplaceHostHandler();
handler->init(kvstore);
return handler;
});
return svc->start();
}

int main(int argc, char* argv[]) {
google::SetVersionString(nebula::versionString());
// Detect if the server has already been started
Expand Down
9 changes: 0 additions & 9 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -866,18 +866,9 @@ struct DropZoneReq {
1: binary zone_name,
}

<<<<<<< HEAD
struct DivideZoneReq {
1: binary zone_name,
2: map<binary, list<common.HostAddr>> (cpp.template = "std::unordered_map") zone_items,
=======
struct SplitZoneReq {
1: binary zone_name,
2: binary one_zone_name,
3: list<common.HostAddr> one_zone_hosts,
4: binary another_zone_name,
5: list<common.HostAddr> another_zone_hosts,
>>>>>>> support zone operations
}

struct RenameZoneReq {
Expand Down
4 changes: 0 additions & 4 deletions src/kvstore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@ nebula_add_library(

nebula_add_subdirectory(raftex)
nebula_add_subdirectory(wal)
<<<<<<< HEAD
nebula_add_subdirectory(stats)
nebula_add_subdirectory(test)
=======
#nebula_add_subdirectory(test)
>>>>>>> support zone operations

#nebula_add_subdirectory(plugins)

2 changes: 2 additions & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,14 @@ nebula_add_library(
add_dependencies(
meta_version_man_obj
meta_v1_thrift_obj
meta_v2_thrift_obj
)

set(meta_test_deps
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v2_thrift_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
Expand Down
92 changes: 86 additions & 6 deletions src/meta/MetaVersionMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "meta/processors/job/JobUtils.h"
#include "meta/upgrade/MetaDataUpgrade.h"
#include "meta/upgrade/v1/MetaServiceUtilsV1.h"
#include "meta/upgrade/v2/MetaServiceUtilsV2.h"

DEFINE_bool(null_type, true, "set schema to support null type");
DEFINE_bool(print_info, false, "enable to print the rewrite data");
Expand Down Expand Up @@ -51,10 +52,10 @@ MetaVersion MetaVersionMan::getVersionByHost(kvstore::KVStore* kv) {
// static
bool MetaVersionMan::setMetaVersionToKV(kvstore::KVStore* kv) {
CHECK_NOTNULL(kv);
auto v2 = MetaVersion::V2;
auto v3 = MetaVersion::V3;
std::vector<kvstore::KV> data;
data.emplace_back(kMetaVersionKey,
std::string(reinterpret_cast<const char*>(&v2), sizeof(MetaVersion)));
std::string(reinterpret_cast<const char*>(&v3), sizeof(MetaVersion)));
bool ret = true;
folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(
Expand All @@ -63,7 +64,7 @@ bool MetaVersionMan::setMetaVersionToKV(kvstore::KVStore* kv) {
LOG(ERROR) << "Put failed, error: " << static_cast<int32_t>(code);
ret = false;
} else {
LOG(INFO) << "Write meta version 2 succeeds";
LOG(INFO) << "Write meta version 3 succeeds";
}
baton.post();
});
Expand All @@ -80,7 +81,28 @@ Status MetaVersionMan::updateMetaV1ToV2(kvstore::KVStore* kv) {
LOG(ERROR) << "Create snapshot failed: " << snapshot;
return Status::Error("Create snapshot failed");
}
auto status = doUpgrade(kv);
auto status = doUpgradeV1ToV2(kv);
if (!status.ok()) {
// rollback by snapshot
return status;
}
// delete snapshot file
auto dmRet = kv->dropCheckpoint(kDefaultSpaceId, snapshot);
if (dmRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Delete snapshot: " << snapshot << " failed, You need to delete it manually";
}
return Status::OK();
}

Status MetaVersionMan::updateMetaV2ToV3(kvstore::KVStore* kv) {
CHECK_NOTNULL(kv);
auto snapshot = folly::format("META_UPGRADE_SNAPSHOT_{}", MetaKeyUtils::genTimestampStr()).str();
auto meteRet = kv->createCheckpoint(kDefaultSpaceId, snapshot);
if (meteRet.isLeftType()) {
LOG(ERROR) << "Create snapshot failed: " << snapshot;
return Status::Error("Create snapshot failed");
}
auto status = doUpgradeV2ToV3(kv);
if (!status.ok()) {
// rollback by snapshot
return status;
Expand All @@ -94,7 +116,7 @@ Status MetaVersionMan::updateMetaV1ToV2(kvstore::KVStore* kv) {
}

// static
Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
MetaDataUpgrade upgrader(kv);
{
// kSpacesTable
Expand All @@ -105,7 +127,7 @@ Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
Status status = Status::OK();
while (iter->valid()) {
if (FLAGS_print_info) {
upgrader.printSpaces(iter->val());
upgrader.printSpacesV1(iter->val());
}
status = upgrader.rewriteSpaces(iter->key(), iter->val());
if (!status.ok()) {
Expand Down Expand Up @@ -158,6 +180,7 @@ Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
}
}
}

{
// kLeadersTable
auto prefix = nebula::meta::v1::kLeadersTable;
Expand All @@ -178,6 +201,7 @@ Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
}
}
}

{
// kTagsTable
auto prefix = nebula::meta::v1::kTagsTable;
Expand Down Expand Up @@ -334,5 +358,61 @@ Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
}
}

Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) {
MetaDataUpgrade upgrader(kv);
// Step 1: Upgrade HeartBeat into machine list
{
const auto& prefix = MetaKeyUtils::hostPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get active hosts failed";
return Status::Error("Get hosts failed");
}

std::vector<kvstore::KV> data;
while (iter->valid()) {
// Save the machine information
auto host = MetaKeyUtils::parseHostKey(iter->key());
auto machineKey = MetaKeyUtils::machineKey(host.host, host.port);
data.emplace_back(std::move(machineKey), "");

// Save the zone information
auto zoneName = folly::stringPrintf("default_zone_%s_%d", host.host.c_str(), host.port);
auto zoneKey = MetaKeyUtils::zoneKey(std::move(zoneName));
auto zoneVal = MetaKeyUtils::zoneVal({host});
data.emplace_back(std::move(zoneKey), std::move(zoneVal));
iter->next();
}
}

// Step 2: Update Create space properties about Group
{
const auto& prefix = MetaKeyUtils::spacePrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get spaces failed";
return Status::Error("Get spaces failed");
}

while (iter->valid()) {
if (FLAGS_print_info) {
upgrader.printSpacesV2(iter->val());
}
auto spaceProperties = meta::v2::MetaServiceUtilsV2::parseSpace(iter->val());
if (spaceProperties.group_name_ref().has_value()) {
Status status = upgrader.rewriteSpacesV2ToV3(iter->key(), iter->val());
if (!status.ok()) {
LOG(ERROR) << status;
return status;
}
}
iter->next();
}
}
return Status::OK();
}

} // namespace meta
} // namespace nebula
9 changes: 7 additions & 2 deletions src/meta/MetaVersionMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ enum class MetaVersion {
UNKNOWN = 0,
V1 = 1,
V2 = 2,
V3 = 3,
};

/**
Expand All @@ -32,13 +33,17 @@ class MetaVersionMan final {

static Status updateMetaV1ToV2(kvstore::KVStore* kv);

static Status updateMetaV2ToV3(kvstore::KVStore* kv);

private:
static MetaVersion getVersionByHost(kvstore::KVStore* kv);

static Status doUpgrade(kvstore::KVStore* kv);
static Status doUpgradeV1ToV2(kvstore::KVStore* kv);

static Status doUpgradeV2ToV3(kvstore::KVStore* kv);
};

} // namespace meta
} // namespace nebula

#endif // META_ROOTUSERMAN_H_
#endif // META_METAVERSIONMAN_H_
3 changes: 3 additions & 0 deletions src/meta/upgrade/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ nebula_add_library(
meta_data_upgrade_obj OBJECT
MetaDataUpgrade.cpp
v1/MetaServiceUtilsV1.cpp
v2/MetaServiceUtilsV2.cpp
)

add_dependencies(
meta_data_upgrade_obj
meta_v1_thrift_obj
meta_v2_thrift_obj
)

nebula_add_subdirectory(v1)
nebula_add_subdirectory(v2)
Loading

0 comments on commit d887631

Please sign in to comment.