From 67e66751cd678f1f68fc5fd267af68c1671f3033 Mon Sep 17 00:00:00 2001 From: liwenhui-soul <38217397+liwenhui-soul@users.noreply.github.com> Date: Thu, 23 Dec 2021 13:00:57 +0800 Subject: [PATCH] refactor zone balance --- .../processors/job/ZoneBalanceJobExecutor.cpp | 259 +++++++++--------- .../processors/job/ZoneBalanceJobExecutor.h | 7 + 2 files changed, 142 insertions(+), 124 deletions(-) diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp index eb3abd1fd24..4059b5b18be 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp @@ -94,6 +94,129 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::updateMeta() { return ret; } +HostAddr ZoneBalanceJobExecutor::insertPartIntoZone( + std::map>* sortedZoneHosts, Zone* zone, PartitionID partId) { + std::vector& sortedHosts = sortedZoneHosts->operator[](zone->zoneName_); + sortedHosts.front()->parts_.emplace(partId); + zone->partNum_++; + HostAddr ha = sortedHosts.front()->host_; + for (size_t i = 0; i < sortedHosts.size() - 1; i++) { + if (sortedHosts[i]->parts_.size() >= sortedHosts[i + 1]->parts_.size()) { + std::swap(sortedHosts[i], sortedHosts[i + 1]); + } else { + break; + } + } + return ha; +} + +nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones( + std::vector* sortedActiveZones, + std::map>* sortedZoneHosts, + std::vector* tasks) { + std::vector& sortedActiveZonesRef = *sortedActiveZones; + std::map>& sortedZoneHostsRef = *sortedZoneHosts; + int32_t totalPartNum = 0; + int32_t avgPartNum = 0; + for (auto& z : sortedActiveZonesRef) { + totalPartNum += z->partNum_; + } + if (sortedActiveZonesRef.size() == 0) { + LOG(ERROR) << "rebalance error: no active zones"; + return nebula::cpp2::ErrorCode::E_NO_HOSTS; + } + avgPartNum = totalPartNum / sortedActiveZonesRef.size(); + int32_t remainder = totalPartNum - avgPartNum * sortedActiveZonesRef.size(); + int32_t leftBegin = 0; + int32_t leftEnd = 0; + int32_t rightBegin = 0; + int32_t rightEnd = sortedActiveZonesRef.size(); + for (size_t i = 0; i < sortedActiveZonesRef.size(); i++) { + if (avgPartNum <= sortedActiveZonesRef[i]->partNum_) { + leftEnd = i; + break; + } + } + for (size_t i = leftEnd; i < sortedActiveZonesRef.size(); i++) { + if (avgPartNum < sortedActiveZonesRef[i]->partNum_) { + rightBegin = i; + break; + } + } + auto findZoneToInsert = [&](PartitionID partId, const HostAddr& srcHost) -> bool { + for (int32_t leftIndex = leftBegin; leftIndex < leftEnd; leftIndex++) { + if (!sortedActiveZonesRef[leftIndex]->partExist(partId)) { + HostAddr dst = insertPartIntoZone(sortedZoneHosts, sortedActiveZonesRef[leftIndex], partId); + tasks->emplace_back( + jobId_, spaceInfo_.spaceId_, partId, srcHost, dst, kvstore_, adminClient_); + int32_t newLeftIndex = leftIndex; + for (; newLeftIndex < leftEnd - 1; newLeftIndex++) { + if (sortedActiveZonesRef[newLeftIndex]->partNum_ > + sortedActiveZonesRef[newLeftIndex + 1]->partNum_) { + std::swap(sortedActiveZonesRef[newLeftIndex], sortedActiveZonesRef[newLeftIndex + 1]); + } else { + break; + } + } + // if the zone's part reach the avgPartNum,it can't recieve parts any more + if (newLeftIndex == leftEnd - 1 && + sortedActiveZonesRef[newLeftIndex]->partNum_ >= avgPartNum) { + leftEnd--; + } + // all zones in left side have reached avgPartNum,and now some of them will take + // avgPartNum+1 if there still has remainder + if (leftBegin == leftEnd) { + leftEnd = rightBegin; + } + return true; + } + } + return false; + }; + for (int32_t right = rightBegin; right < rightEnd;) { + Zone* srcZone = sortedActiveZonesRef[right]; + // if remainder>0 some zones will hold avgPartNum+1 patrs, we prioritise choosing zones in right + // side to hold them + if (srcZone->partNum_ == avgPartNum + 1 && remainder) { + right++; + remainder--; + continue; + } + if (srcZone->partNum_ == avgPartNum) { + right++; + continue; + } + std::vector& sortedHosts = sortedZoneHostsRef[srcZone->zoneName_]; + int32_t hostIndex = sortedHosts.size() - 1; + // to find a part to move,we prioritise moving parts from who has the most + for (; hostIndex >= 0; hostIndex--) { + std::set& hostParts = sortedHosts[hostIndex]->parts_; + PartitionID movePart = -1; + for (PartitionID partId : hostParts) { + // to find a zone which does not contain the part in the left side to insert + bool matched = findZoneToInsert(partId, sortedHosts[hostIndex]->host_); + if (matched) { + movePart = partId; + break; + } + } + if (movePart != -1) { + hostParts.erase(movePart); + srcZone->partNum_--; + break; + } + } + for (int32_t i = hostIndex; i > 0; i--) { + if (sortedHosts[i]->parts_.size() <= sortedHosts[i - 1]->parts_.size()) { + std::swap(sortedHosts[i], sortedHosts[i - 1]); + } else { + break; + } + } + } + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + /* first, move the lostZones' parts to the active zones * second, make balance for the active zones */ Status ZoneBalanceJobExecutor::buildBalancePlan() { @@ -142,23 +265,8 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() { return l->partNum_ < r->partNum_; }); - auto insertPartIntoZone = [&sortedZoneHosts](Zone* zone, PartitionID partId) -> HostAddr { - std::vector& sortedHosts = sortedZoneHosts[zone->zoneName_]; - sortedHosts.front()->parts_.emplace(partId); - zone->partNum_++; - HostAddr ha = sortedHosts.front()->host_; - for (size_t i = 0; i < sortedHosts.size() - 1; i++) { - if (sortedHosts[i]->parts_.size() >= sortedHosts[i + 1]->parts_.size()) { - std::swap(sortedHosts[i], sortedHosts[i + 1]); - } else { - break; - } - } - return ha; - }; - - auto chooseZoneToInsert = [&insertPartIntoZone, - &sortedActiveZones](PartitionID partId) -> HostAddr { + auto chooseZoneToInsert = + [this, &sortedActiveZones, &sortedZoneHosts](PartitionID partId) -> HostAddr { size_t index = 0; for (size_t i = 0; i < sortedActiveZones.size(); i++) { if (!sortedActiveZones[i]->partExist(partId)) { @@ -166,7 +274,7 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() { break; } } - HostAddr ha = insertPartIntoZone(sortedActiveZones[index], partId); + HostAddr ha = insertPartIntoZone(&sortedZoneHosts, sortedActiveZones[index], partId); for (size_t i = index; i < sortedActiveZones.size() - 1; i++) { if (sortedActiveZones[i]->partNum_ >= sortedActiveZones[i + 1]->partNum_) { std::swap(sortedActiveZones[i], sortedActiveZones[i + 1]); @@ -181,126 +289,29 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() { for (auto& zoneMapEntry : lostZones) { Zone* zone = zoneMapEntry.second; for (auto& hostMapEntry : zone->hosts_) { - for (PartitionID partId : hostMapEntry.second.parts_) { + const HostAddr& hostAddr = hostMapEntry.first; + Host& host = hostMapEntry.second; + for (PartitionID partId : host.parts_) { HostAddr dst = chooseZoneToInsert(partId); tasks.emplace_back( - jobId_, spaceInfo_.spaceId_, partId, hostMapEntry.first, dst, kvstore_, adminClient_); + jobId_, spaceInfo_.spaceId_, partId, hostAddr, dst, kvstore_, adminClient_); } - hostMapEntry.second.parts_.clear(); + host.parts_.clear(); } zone->calPartNum(); } // all parts of lost zones have moved to active zones, then rebalance the active zones - int32_t totalPartNum = 0; - int32_t avgPartNum = 0; - for (auto& z : sortedActiveZones) { - totalPartNum += z->partNum_; - } - if (sortedActiveZones.size() == 0) { - LOG(ERROR) << "rebalance error: no active zones"; - return {}; - } - avgPartNum = totalPartNum / sortedActiveZones.size(); - int32_t remainder = totalPartNum - avgPartNum * sortedActiveZones.size(); - int32_t leftBegin = 0; - int32_t leftEnd = 0; - int32_t rightBegin = 0; - int32_t rightEnd = sortedActiveZones.size(); - for (size_t i = 0; i < sortedActiveZones.size(); i++) { - if (avgPartNum <= sortedActiveZones[i]->partNum_) { - leftEnd = i; - break; - } - } - for (size_t i = leftEnd; i < sortedActiveZones.size(); i++) { - if (avgPartNum < sortedActiveZones[i]->partNum_) { - rightBegin = i; - break; - } - } - for (int32_t right = rightBegin; right < rightEnd;) { - Zone* srcZone = sortedActiveZones[right]; - // if remainder>0 some zones will hold avgPartNum+1 patrs, we prioritise taking the right side - // zones to hold them - if (srcZone->partNum_ == avgPartNum + 1 && remainder) { - right++; - remainder--; - continue; - } - if (srcZone->partNum_ == avgPartNum) { - right++; - continue; - } - std::vector& sortedHosts = sortedZoneHosts[srcZone->zoneName_]; - int32_t hostIndex = sortedHosts.size() - 1; - // to find a part to move,we prioritise moving parts from who has the most - for (; hostIndex >= 0; hostIndex--) { - std::set& hostParts = sortedHosts[hostIndex]->parts_; - PartitionID movePart = -1; - for (PartitionID partId : hostParts) { - bool matched = false; - // to find a zone which does not contain the part in the left side to insert - for (int32_t leftIndex = leftBegin; leftIndex < leftEnd; leftIndex++) { - if (!sortedActiveZones[leftIndex]->partExist(partId)) { - HostAddr dst = insertPartIntoZone(sortedActiveZones[leftIndex], partId); - tasks.emplace_back(jobId_, - spaceInfo_.spaceId_, - partId, - sortedHosts[hostIndex]->host_, - dst, - kvstore_, - adminClient_); - movePart = partId; - int32_t newLeftIndex = leftIndex; - for (; newLeftIndex < leftEnd - 1; newLeftIndex++) { - if (sortedActiveZones[newLeftIndex]->partNum_ > - sortedActiveZones[newLeftIndex + 1]->partNum_) { - std::swap(sortedActiveZones[newLeftIndex], sortedActiveZones[newLeftIndex + 1]); - } else { - break; - } - } - // if the zone's part reach the avgPartNum,it can't recieve parts any more - if (newLeftIndex == leftEnd - 1 && - sortedActiveZones[newLeftIndex]->partNum_ >= avgPartNum) { - leftEnd--; - } - // all zones in left side have reached avgPartNum,and now some of them will take - // avgPartNum+1 if there still has remainder - if (leftBegin == leftEnd) { - leftEnd = rightBegin; - } - matched = true; - break; - } - } - if (matched) { - break; - } - } - if (movePart != -1) { - hostParts.erase(movePart); - srcZone->partNum_--; - break; - } - } - for (int32_t i = hostIndex; i > 0; i--) { - if (sortedHosts[i]->parts_.size() <= sortedHosts[i - 1]->parts_.size()) { - std::swap(sortedHosts[i], sortedHosts[i - 1]); - } else { - break; - } - } - } - if (tasks.empty()) { + nebula::cpp2::ErrorCode rc = rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &tasks); + + if (tasks.empty() || rc != nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::Balanced(); } plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_)); for (BalanceTask& task : tasks) { plan_->addTask(std::move(task)); } - nebula::cpp2::ErrorCode rc = plan_->saveInStore(); + rc = plan_->saveInStore(); if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::Error("save balance zone plan failed"); } diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.h b/src/meta/processors/job/ZoneBalanceJobExecutor.h index 14df98e62ce..798675191b5 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.h +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.h @@ -32,6 +32,13 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor { folly::Future executeInternal() override; Status buildBalancePlan() override; nebula::cpp2::ErrorCode updateMeta(); + HostAddr insertPartIntoZone(std::map>* sortedZoneHosts, + Zone* zone, + PartitionID partId); + nebula::cpp2::ErrorCode rebalanceActiveZones( + std::vector* sortedActiveZones, + std::map>* sortedZoneHosts, + std::vector* tasks); private: std::vector lostZones_;