From aabff083f994709811433b30c061b2a375a23ff4 Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Mon, 6 Dec 2021 18:09:17 +0800 Subject: [PATCH 1/8] add db-upgrade V3 --- src/tools/db-upgrade/CMakeLists.txt | 1 + src/tools/db-upgrade/DbUpgrader.cpp | 77 ++++++++++++++++++++++- src/tools/db-upgrade/DbUpgrader.h | 5 ++ src/tools/db-upgrade/DbUpgraderTool.cpp | 9 +-- src/tools/db-upgrade/NebulaKeyUtilsV3.cpp | 22 +++++++ src/tools/db-upgrade/NebulaKeyUtilsV3.h | 17 +++++ 6 files changed, 126 insertions(+), 5 deletions(-) create mode 100644 src/tools/db-upgrade/NebulaKeyUtilsV3.cpp create mode 100644 src/tools/db-upgrade/NebulaKeyUtilsV3.h diff --git a/src/tools/db-upgrade/CMakeLists.txt b/src/tools/db-upgrade/CMakeLists.txt index d43c203ab71..2e9c098b935 100644 --- a/src/tools/db-upgrade/CMakeLists.txt +++ b/src/tools/db-upgrade/CMakeLists.txt @@ -5,6 +5,7 @@ nebula_add_executable( DbUpgraderTool.cpp NebulaKeyUtilsV1.cpp NebulaKeyUtilsV2.cpp + NebulaKeyUtilsV3.cpp DbUpgrader.cpp OBJECTS $ diff --git a/src/tools/db-upgrade/DbUpgrader.cpp b/src/tools/db-upgrade/DbUpgrader.cpp index 40c638a6354..76e6ac06bc1 100644 --- a/src/tools/db-upgrade/DbUpgrader.cpp +++ b/src/tools/db-upgrade/DbUpgrader.cpp @@ -11,6 +11,7 @@ #include "common/utils/NebulaKeyUtils.h" #include "tools/db-upgrade/NebulaKeyUtilsV1.h" #include "tools/db-upgrade/NebulaKeyUtilsV2.h" +#include "tools/db-upgrade/NebulaKeyUtilsV3.h" DEFINE_string(src_db_path, "", @@ -882,6 +883,78 @@ std::string UpgraderSpace::encodeRowVal(const RowReader* reader, return std::move(rowWrite).moveEncodedStr(); } +void UpgraderSpace::runPartV3() { + std::chrono::milliseconds take_dura{10}; + if (auto pId = partQueue_.try_take_for(take_dura)) { + PartitionID partId = *pId; + // Handle vertex and edge, if there is an index, generate index data + LOG(INFO) << "Start to handle vertex/edge/index data in space id " << spaceId_ << " part id " + << partId; + auto prefix = NebulaKeyUtilsV3::partTagPrefix(partId); + std::unique_ptr iter; + auto retCode = readEngine_->prefix(prefix, &iter); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Space id " << spaceId_ << " part " << partId << " no found!"; + LOG(ERROR) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " + << partId << " failed"; + + auto unFinishedPart = --unFinishedPart_; + if (unFinishedPart == 0) { + // all parts has finished + LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id " + << spaceId_ << " finished"; + } else { + pool_->add(std::bind(&UpgraderSpace::runPartV3, this)); + } + return; + } + std::vector data; + std::string lastVertexKey = ""; + while (iter && iter->valid()) { + auto vertex = NebulaKeyUtilsV3::getVertexKey(iter->key()); + if (vertex == lastVertexKey) { + continue; + } + data.emplace_back(vertex, ""); + lastVertexKey = vertex; + } + auto code = writeEngine_->multiPut(data); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(FATAL) << "Write multi put in space id " << spaceId_ << " part id " << partId + << " failed."; + } + data.clear(); + LOG(INFO) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " << partId + << " succeed"; + + auto unFinishedPart = --unFinishedPart_; + if (unFinishedPart == 0) { + // all parts has finished + LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id " + << spaceId_ << " finished."; + } else { + pool_->add(std::bind(&UpgraderSpace::runPartV3, this)); + } + } else { + LOG(INFO) << "Handle vertex/edge/index of parts data in space id " << spaceId_ << " finished"; + } +} +void UpgraderSpace::doProcessV3() { + LOG(INFO) << "Start to handle data in space id " << spaceId_; + // Parallel process part + auto partConcurrency = std::min(static_cast(FLAGS_max_concurrent_parts), parts_.size()); + LOG(INFO) << "Max concurrent parts: " << partConcurrency; + unFinishedPart_ = parts_.size(); + + LOG(INFO) << "Start to handle vertex/edge/index of parts data in space id " << spaceId_; + for (size_t i = 0; i < partConcurrency; ++i) { + pool_->add(std::bind(&UpgraderSpace::runPartV3, this)); + } + + while (unFinishedPart_ != 0) { + sleep(10); + } +} std::vector UpgraderSpace::indexVertexKeys( PartitionID partId, VertexID& vId, @@ -1096,8 +1169,10 @@ void DbUpgrader::doSpace() { << " begin"; if (FLAGS_upgrade_version == 1) { upgraderSpaceIter->doProcessV1(); - } else { + } else if (FLAGS_upgrade_version == 2) { upgraderSpaceIter->doProcessV2(); + } else { + upgraderSpaceIter->doProcessV3(); } auto ret = upgraderSpaceIter->copyWal(); diff --git a/src/tools/db-upgrade/DbUpgrader.h b/src/tools/db-upgrade/DbUpgrader.h index 6851893f739..db711fc2c63 100644 --- a/src/tools/db-upgrade/DbUpgrader.h +++ b/src/tools/db-upgrade/DbUpgrader.h @@ -55,6 +55,9 @@ class UpgraderSpace { // Processing v2 Rc data upgrade to v2 Ga void doProcessV2(); + // Processing v2 Ga data upgrade to v3 + void doProcessV3(); + // Perform manual compact void doCompaction(); @@ -111,6 +114,8 @@ class UpgraderSpace { void runPartV2(); + void runPartV3(); + public: // Source data path std::string srcPath_; diff --git a/src/tools/db-upgrade/DbUpgraderTool.cpp b/src/tools/db-upgrade/DbUpgraderTool.cpp index 94b35c51d22..3e6a03b7229 100644 --- a/src/tools/db-upgrade/DbUpgraderTool.cpp +++ b/src/tools/db-upgrade/DbUpgraderTool.cpp @@ -39,10 +39,11 @@ void printHelp() { A list of meta severs' ip:port separated by comma. Default: 127.0.0.1:45500 - --upgrade_version=<1|2> - This tool can only upgrade 1.x data or 2.0 RC data. + --upgrade_version=<1|2|3> + This tool can only upgrade 1.x data, 2.0 RC, or 2.0 GA data. When the value is 1, upgrade the data from 1.x to 2.0 GA. When the value is 2, upgrade the data from 2.0 RC to 2.0 GA. + When the Value is 3, upgrade the data from 2.0 GA to 3.0 Default: 0 optional: @@ -164,9 +165,9 @@ int main(int argc, char* argv[]) { CHECK_NOTNULL(schemaMan); CHECK_NOTNULL(indexMan); - if (FLAGS_upgrade_version != 1 && FLAGS_upgrade_version != 2) { + if (FLAGS_upgrade_version != 1 && FLAGS_upgrade_version != 2 && FLAGS_upgrade_version != 3) { LOG(ERROR) << "Flag upgrade_version : " << FLAGS_upgrade_version - << " illegal, upgrade_version can only be 1 or 2"; + << " illegal, upgrade_version can only be 1,2,3"; return EXIT_FAILURE; } LOG(INFO) << "Prepare phase end"; diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp new file mode 100644 index 00000000000..8ef32c84d50 --- /dev/null +++ b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp @@ -0,0 +1,22 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "tools/db-upgrade/NebulaKeyUtilsV3.h" + +namespace nebula { +std::string NebulaKeyUtilsV3::partTagPrefix(PartitionID partId) { + PartitionID item = (partId << kPartitionOffset) | static_cast(kTag_); + std::string key; + key.reserve(sizeof(PartitionID)); + key.append(reinterpret_cast(&item), sizeof(PartitionID)); + return key; +} +std::string NebulaKeyUtilsV3::getVertexKey(folly::StringPiece tagKey) { + std::string key = tagKey.toString(); + key[3] = static_cast(kVertex); + key.resize(key.size() - sizeof(TagID)); + return key; +} +} // namespace nebula diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV3.h b/src/tools/db-upgrade/NebulaKeyUtilsV3.h new file mode 100644 index 00000000000..5b5f36c2536 --- /dev/null +++ b/src/tools/db-upgrade/NebulaKeyUtilsV3.h @@ -0,0 +1,17 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ +#pragma once +#include "common/utils/Types.h" +namespace nebula { +class NebulaKeyUtilsV3 { + public: + static std::string partTagPrefix(PartitionID partId); + static std::string getVertexKey(folly::StringPiece tagKey); + + private: + enum NebulaKeyTypeV3 : uint32_t { kTag_ = 0x00000001, kVertex = 0x00000007 }; +}; + +} // namespace nebula From 1f51f3b4fc51c7630873d19763bbafcecdd825ba Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Wed, 15 Dec 2021 16:55:40 +0800 Subject: [PATCH 2/8] use ingest --- src/tools/db-upgrade/DbUpgrader.cpp | 40 +++++++++++++++++++++++++---- src/tools/db-upgrade/DbUpgrader.h | 3 +++ 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/src/tools/db-upgrade/DbUpgrader.cpp b/src/tools/db-upgrade/DbUpgrader.cpp index 76e6ac06bc1..8124383eb2f 100644 --- a/src/tools/db-upgrade/DbUpgrader.cpp +++ b/src/tools/db-upgrade/DbUpgrader.cpp @@ -9,6 +9,7 @@ #include "common/fs/FileUtils.h" #include "common/utils/IndexKeyUtils.h" #include "common/utils/NebulaKeyUtils.h" +#include "rocksdb/sst_file_writer.h" #include "tools/db-upgrade/NebulaKeyUtilsV1.h" #include "tools/db-upgrade/NebulaKeyUtilsV2.h" #include "tools/db-upgrade/NebulaKeyUtilsV3.h" @@ -908,6 +909,29 @@ void UpgraderSpace::runPartV3() { } return; } + auto write_sst = [&, this](const std::vector& data) { + ::rocksdb::Options option; + option.create_if_missing = true; + option.compression = ::rocksdb::CompressionType::kNoCompression; + ::rocksdb::SstFileWriter sst_file_writer(::rocksdb::EnvOptions(), option); + std::string file = ::fmt::format( + ".nebula_upgrade.space-{}.part-{}.{}.sst", spaceId_, partId, std::time(nullptr)); + ::rocksdb::Status s = sst_file_writer.Open(file); + if (!s.ok()) { + LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":" + << s.code(); + } + for (auto item : data) { + sst_file_writer.Put(item.first, item.second); + } + s = sst_file_writer.Finish(); + if (!s.ok()) { + LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":" + << s.code(); + } + std::lock_guard lck(this->ingest_sst_file_mut_); + ingest_sst_file_.push_back(file); + }; std::vector data; std::string lastVertexKey = ""; while (iter && iter->valid()) { @@ -917,13 +941,15 @@ void UpgraderSpace::runPartV3() { } data.emplace_back(vertex, ""); lastVertexKey = vertex; + if (data.size() >= 100000) { + write_sst(data); + data.clear(); + } } - auto code = writeEngine_->multiPut(data); - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(FATAL) << "Write multi put in space id " << spaceId_ << " part id " << partId - << " failed."; + if (!data.empty()) { + write_sst(data); + data.clear(); } - data.clear(); LOG(INFO) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " << partId << " succeed"; @@ -954,6 +980,10 @@ void UpgraderSpace::doProcessV3() { while (unFinishedPart_ != 0) { sleep(10); } + auto code = readEngine_->ingest(ingest_sst_file_, true); + if (code != ::nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(FATAL) << "Faild upgrade V3 when ingest sst file:" << static_cast(code); + } } std::vector UpgraderSpace::indexVertexKeys( PartitionID partId, diff --git a/src/tools/db-upgrade/DbUpgrader.h b/src/tools/db-upgrade/DbUpgrader.h index db711fc2c63..982af408182 100644 --- a/src/tools/db-upgrade/DbUpgrader.h +++ b/src/tools/db-upgrade/DbUpgrader.h @@ -164,6 +164,9 @@ class UpgraderSpace { folly::UnboundedBlockingQueue partQueue_; std::atomic unFinishedPart_; + + std::mutex ingest_sst_file_mut_; + std::vector ingest_sst_file_; }; // Upgrade one data path in storage conf From 84a3581a575239163889f2270fe4dab0e1914c0d Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Tue, 21 Dec 2021 17:39:14 +0800 Subject: [PATCH 3/8] modify upgrade args --- src/tools/db-upgrade/DbUpgrader.cpp | 19 +++++++++++-------- src/tools/db-upgrade/DbUpgrader.h | 2 +- src/tools/db-upgrade/DbUpgraderTool.cpp | 16 ++++++++-------- src/tools/db-upgrade/NebulaKeyUtilsV3.cpp | 3 +++ src/tools/db-upgrade/NebulaKeyUtilsV3.h | 2 ++ 5 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/tools/db-upgrade/DbUpgrader.cpp b/src/tools/db-upgrade/DbUpgrader.cpp index 8124383eb2f..f156d113e27 100644 --- a/src/tools/db-upgrade/DbUpgrader.cpp +++ b/src/tools/db-upgrade/DbUpgrader.cpp @@ -24,10 +24,11 @@ DEFINE_string(dst_db_path, "multi paths should be split by comma"); DEFINE_string(upgrade_meta_server, "127.0.0.1:45500", "Meta servers' address."); DEFINE_uint32(write_batch_num, 100, "The size of the batch written to rocksdb"); -DEFINE_uint32(upgrade_version, - 0, - "When the value is 1, upgrade the data from 1.x to 2.0 GA. " - "When the value is 2, upgrade the data from 2.0 RC to 2.0 GA."); +DEFINE_string(upgrade_version, + "", + "When the value is 1:2, upgrade the data from 1.x to 2.0 GA. " + "When the value is 2RC:2, upgrade the data from 2.0 RC to 2.0 GA." + "When the value is 2:3, upgrade the data from "); DEFINE_bool(compactions, true, "When the upgrade of the space is completed, " @@ -982,7 +983,7 @@ void UpgraderSpace::doProcessV3() { } auto code = readEngine_->ingest(ingest_sst_file_, true); if (code != ::nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(FATAL) << "Faild upgrade V3 when ingest sst file:" << static_cast(code); + LOG(FATAL) << "Faild upgrade 2:3 when ingest sst file:" << static_cast(code); } } std::vector UpgraderSpace::indexVertexKeys( @@ -1197,12 +1198,14 @@ void DbUpgrader::doSpace() { LOG(INFO) << "Upgrade from path " << upgraderSpaceIter->srcPath_ << " space id " << upgraderSpaceIter->entry_ << " to path " << upgraderSpaceIter->dstPath_ << " begin"; - if (FLAGS_upgrade_version == 1) { + if (FLAGS_upgrade_version == "1:2") { upgraderSpaceIter->doProcessV1(); - } else if (FLAGS_upgrade_version == 2) { + } else if (FLAGS_upgrade_version == "2RC:2") { upgraderSpaceIter->doProcessV2(); - } else { + } else if (FLAGS_upgrade_version == "2:3") { upgraderSpaceIter->doProcessV3(); + } else { + LOG(FATAL) << "error upgrade version " << FLAGS_upgrade_version; } auto ret = upgraderSpaceIter->copyWal(); diff --git a/src/tools/db-upgrade/DbUpgrader.h b/src/tools/db-upgrade/DbUpgrader.h index 982af408182..96c77dcacc0 100644 --- a/src/tools/db-upgrade/DbUpgrader.h +++ b/src/tools/db-upgrade/DbUpgrader.h @@ -23,7 +23,7 @@ DECLARE_string(src_db_path); DECLARE_string(dst_db_path); DECLARE_string(upgrade_meta_server); DECLARE_uint32(write_batch_num); -DECLARE_uint32(upgrade_version); +DECLARE_string(upgrade_version); DECLARE_bool(compactions); DECLARE_uint32(max_concurrent_parts); DECLARE_uint32(max_concurrent_spaces); diff --git a/src/tools/db-upgrade/DbUpgraderTool.cpp b/src/tools/db-upgrade/DbUpgraderTool.cpp index 3e6a03b7229..f36ba943546 100644 --- a/src/tools/db-upgrade/DbUpgraderTool.cpp +++ b/src/tools/db-upgrade/DbUpgraderTool.cpp @@ -39,12 +39,12 @@ void printHelp() { A list of meta severs' ip:port separated by comma. Default: 127.0.0.1:45500 - --upgrade_version=<1|2|3> + --upgrade_version=<1:2|2RC:2|2:3> This tool can only upgrade 1.x data, 2.0 RC, or 2.0 GA data. - When the value is 1, upgrade the data from 1.x to 2.0 GA. - When the value is 2, upgrade the data from 2.0 RC to 2.0 GA. - When the Value is 3, upgrade the data from 2.0 GA to 3.0 - Default: 0 + 1:2 upgrade the data from 1.x to 2.0GA + 2RC:2 upgrade the data from 2.0RC to 2.0GA + 2:3 upgrade the data from 2.0GA to 3.0 + Default: "" optional: --write_batch_num= @@ -165,9 +165,9 @@ int main(int argc, char* argv[]) { CHECK_NOTNULL(schemaMan); CHECK_NOTNULL(indexMan); - if (FLAGS_upgrade_version != 1 && FLAGS_upgrade_version != 2 && FLAGS_upgrade_version != 3) { - LOG(ERROR) << "Flag upgrade_version : " << FLAGS_upgrade_version - << " illegal, upgrade_version can only be 1,2,3"; + std::vector versions = {"1:2", "2RC:2", "2:3"}; + if (std::find(versions.begin(), versions.end(), FLAGS_upgrade_version) == versions.end()) { + LOG(ERROR) << "Flag upgrade_version : " << FLAGS_upgrade_version; return EXIT_FAILURE; } LOG(INFO) << "Prepare phase end"; diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp index 8ef32c84d50..a42a54ca02d 100644 --- a/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp +++ b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp @@ -19,4 +19,7 @@ std::string NebulaKeyUtilsV3::getVertexKey(folly::StringPiece tagKey) { key.resize(key.size() - sizeof(TagID)); return key; } +std::string NebulaKeyUtilsV3::dataVersionKey() { return "\xFF\xFF\xFF\xFF"; } +std::string NebulaKeyUtilsV3::dataVersionValue() { return "3.0"; } + } // namespace nebula diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV3.h b/src/tools/db-upgrade/NebulaKeyUtilsV3.h index 5b5f36c2536..6b6b40d2e51 100644 --- a/src/tools/db-upgrade/NebulaKeyUtilsV3.h +++ b/src/tools/db-upgrade/NebulaKeyUtilsV3.h @@ -9,6 +9,8 @@ class NebulaKeyUtilsV3 { public: static std::string partTagPrefix(PartitionID partId); static std::string getVertexKey(folly::StringPiece tagKey); + static std::string dataVersionKey(); + static std::string dataVersionValue(); private: enum NebulaKeyTypeV3 : uint32_t { kTag_ = 0x00000001, kVertex = 0x00000007 }; From 3a0412bfeb36b07e4d082801c14a25c1b42fd976 Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Tue, 21 Dec 2021 18:37:33 +0800 Subject: [PATCH 4/8] write data version key --- src/tools/db-upgrade/DbUpgrader.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/tools/db-upgrade/DbUpgrader.cpp b/src/tools/db-upgrade/DbUpgrader.cpp index f156d113e27..966a5535879 100644 --- a/src/tools/db-upgrade/DbUpgrader.cpp +++ b/src/tools/db-upgrade/DbUpgrader.cpp @@ -86,7 +86,7 @@ Status UpgraderSpace::initSpace(const std::string& sId) { // Use readonly rocksdb readEngine_.reset(new nebula::kvstore::RocksEngine( - spaceId_, spaceVidLen_, srcPath_, "", nullptr, nullptr, true)); + spaceId_, spaceVidLen_, srcPath_, "", nullptr, nullptr, false)); writeEngine_.reset(new nebula::kvstore::RocksEngine(spaceId_, spaceVidLen_, dstPath_)); parts_.clear(); @@ -985,6 +985,7 @@ void UpgraderSpace::doProcessV3() { if (code != ::nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(FATAL) << "Faild upgrade 2:3 when ingest sst file:" << static_cast(code); } + readEngine_->put(NebulaKeyUtilsV3::dataVersionKey(), NebulaKeyUtilsV3::dataVersionValue()); } std::vector UpgraderSpace::indexVertexKeys( PartitionID partId, From 6399d5ae60e9ec5ed6709a40a9d2851b5e308ed7 Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Thu, 23 Dec 2021 10:52:08 +0800 Subject: [PATCH 5/8] address wenhao's comment --- src/tools/db-upgrade/DbUpgrader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tools/db-upgrade/DbUpgrader.cpp b/src/tools/db-upgrade/DbUpgrader.cpp index 966a5535879..e3ad7f6c583 100644 --- a/src/tools/db-upgrade/DbUpgrader.cpp +++ b/src/tools/db-upgrade/DbUpgrader.cpp @@ -28,7 +28,7 @@ DEFINE_string(upgrade_version, "", "When the value is 1:2, upgrade the data from 1.x to 2.0 GA. " "When the value is 2RC:2, upgrade the data from 2.0 RC to 2.0 GA." - "When the value is 2:3, upgrade the data from "); + "When the value is 2:3, upgrade the data from 2.0 GA to 3.0 ."); DEFINE_bool(compactions, true, "When the upgrade of the space is completed, " From 7145f6b4648c9acbce4d4e75a903a7d90ed412ac Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Thu, 23 Dec 2021 15:12:23 +0800 Subject: [PATCH 6/8] address some comment --- src/tools/db-upgrade/DbUpgrader.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/tools/db-upgrade/DbUpgrader.cpp b/src/tools/db-upgrade/DbUpgrader.cpp index e3ad7f6c583..7f2aee87c5d 100644 --- a/src/tools/db-upgrade/DbUpgrader.cpp +++ b/src/tools/db-upgrade/DbUpgrader.cpp @@ -923,7 +923,11 @@ void UpgraderSpace::runPartV3() { << s.code(); } for (auto item : data) { - sst_file_writer.Put(item.first, item.second); + s = sst_file_writer.Put(item.first, item.second); + if (!s.ok()) { + LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":" + << s.code(); + } } s = sst_file_writer.Finish(); if (!s.ok()) { @@ -938,6 +942,7 @@ void UpgraderSpace::runPartV3() { while (iter && iter->valid()) { auto vertex = NebulaKeyUtilsV3::getVertexKey(iter->key()); if (vertex == lastVertexKey) { + iter->next(); continue; } data.emplace_back(vertex, ""); @@ -946,6 +951,7 @@ void UpgraderSpace::runPartV3() { write_sst(data); data.clear(); } + iter->next(); } if (!data.empty()) { write_sst(data); From 7b92dabfa375e2a546045b8dd8ed05fb3bc10b0d Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Thu, 23 Dec 2021 15:16:28 +0800 Subject: [PATCH 7/8] address some comment --- src/common/utils/NebulaKeyUtils.cpp | 2 ++ src/common/utils/NebulaKeyUtils.h | 2 ++ src/tools/db-upgrade/DbUpgrader.cpp | 2 +- src/tools/db-upgrade/NebulaKeyUtilsV3.cpp | 1 - src/tools/db-upgrade/NebulaKeyUtilsV3.h | 1 - 5 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/common/utils/NebulaKeyUtils.cpp b/src/common/utils/NebulaKeyUtils.cpp index 0323269cbf0..47546f08984 100644 --- a/src/common/utils/NebulaKeyUtils.cpp +++ b/src/common/utils/NebulaKeyUtils.cpp @@ -261,4 +261,6 @@ std::string NebulaKeyUtils::adminTaskKey(int32_t seqId, JobID jobId, TaskID task return key; } +std::string NebulaKeyUtils::dataVersionKey() { return "\xFF\xFF\xFF\xFF"; } + } // namespace nebula diff --git a/src/common/utils/NebulaKeyUtils.h b/src/common/utils/NebulaKeyUtils.h index 2cb53d54af5..2bbafe7dc70 100644 --- a/src/common/utils/NebulaKeyUtils.h +++ b/src/common/utils/NebulaKeyUtils.h @@ -270,6 +270,8 @@ class NebulaKeyUtils final { static std::string adminTaskKey(int32_t seqId, JobID jobId, TaskID taskId); + static std::string dataVersionKey(); + static_assert(sizeof(NebulaKeyType) == sizeof(PartitionID)); private: diff --git a/src/tools/db-upgrade/DbUpgrader.cpp b/src/tools/db-upgrade/DbUpgrader.cpp index 7f2aee87c5d..8e738e45202 100644 --- a/src/tools/db-upgrade/DbUpgrader.cpp +++ b/src/tools/db-upgrade/DbUpgrader.cpp @@ -991,7 +991,7 @@ void UpgraderSpace::doProcessV3() { if (code != ::nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(FATAL) << "Faild upgrade 2:3 when ingest sst file:" << static_cast(code); } - readEngine_->put(NebulaKeyUtilsV3::dataVersionKey(), NebulaKeyUtilsV3::dataVersionValue()); + readEngine_->put(NebulaKeyUtils::dataVersionKey(), NebulaKeyUtilsV3::dataVersionValue()); } std::vector UpgraderSpace::indexVertexKeys( PartitionID partId, diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp index a42a54ca02d..eb9dfcf0c92 100644 --- a/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp +++ b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp @@ -19,7 +19,6 @@ std::string NebulaKeyUtilsV3::getVertexKey(folly::StringPiece tagKey) { key.resize(key.size() - sizeof(TagID)); return key; } -std::string NebulaKeyUtilsV3::dataVersionKey() { return "\xFF\xFF\xFF\xFF"; } std::string NebulaKeyUtilsV3::dataVersionValue() { return "3.0"; } } // namespace nebula diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV3.h b/src/tools/db-upgrade/NebulaKeyUtilsV3.h index 6b6b40d2e51..55862dcd726 100644 --- a/src/tools/db-upgrade/NebulaKeyUtilsV3.h +++ b/src/tools/db-upgrade/NebulaKeyUtilsV3.h @@ -9,7 +9,6 @@ class NebulaKeyUtilsV3 { public: static std::string partTagPrefix(PartitionID partId); static std::string getVertexKey(folly::StringPiece tagKey); - static std::string dataVersionKey(); static std::string dataVersionValue(); private: From fdbf0c4d1b97349ff7aa896d93b9db1fc6b702b7 Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Mon, 27 Dec 2021 11:03:32 +0800 Subject: [PATCH 8/8] format --- src/common/utils/NebulaKeyUtils.cpp | 4 +++- src/tools/db-upgrade/NebulaKeyUtilsV3.cpp | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/common/utils/NebulaKeyUtils.cpp b/src/common/utils/NebulaKeyUtils.cpp index 47546f08984..0a91dd7b622 100644 --- a/src/common/utils/NebulaKeyUtils.cpp +++ b/src/common/utils/NebulaKeyUtils.cpp @@ -261,6 +261,8 @@ std::string NebulaKeyUtils::adminTaskKey(int32_t seqId, JobID jobId, TaskID task return key; } -std::string NebulaKeyUtils::dataVersionKey() { return "\xFF\xFF\xFF\xFF"; } +std::string NebulaKeyUtils::dataVersionKey() { + return "\xFF\xFF\xFF\xFF"; +} } // namespace nebula diff --git a/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp index eb9dfcf0c92..f6105094f4f 100644 --- a/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp +++ b/src/tools/db-upgrade/NebulaKeyUtilsV3.cpp @@ -19,6 +19,8 @@ std::string NebulaKeyUtilsV3::getVertexKey(folly::StringPiece tagKey) { key.resize(key.size() - sizeof(TagID)); return key; } -std::string NebulaKeyUtilsV3::dataVersionValue() { return "3.0"; } +std::string NebulaKeyUtilsV3::dataVersionValue() { + return "3.0"; +} } // namespace nebula