Skip to content

Commit

Permalink
refactor zone balance
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenhui-soul committed Dec 28, 2021
1 parent a9782f4 commit 67e6675
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 124 deletions.
259 changes: 135 additions & 124 deletions src/meta/processors/job/ZoneBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,129 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::updateMeta() {
return ret;
}

HostAddr ZoneBalanceJobExecutor::insertPartIntoZone(
std::map<std::string, std::vector<Host*>>* sortedZoneHosts, Zone* zone, PartitionID partId) {
std::vector<Host*>& sortedHosts = sortedZoneHosts->operator[](zone->zoneName_);
sortedHosts.front()->parts_.emplace(partId);
zone->partNum_++;
HostAddr ha = sortedHosts.front()->host_;
for (size_t i = 0; i < sortedHosts.size() - 1; i++) {
if (sortedHosts[i]->parts_.size() >= sortedHosts[i + 1]->parts_.size()) {
std::swap(sortedHosts[i], sortedHosts[i + 1]);
} else {
break;
}
}
return ha;
}

nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones(
std::vector<Zone*>* sortedActiveZones,
std::map<std::string, std::vector<Host*>>* sortedZoneHosts,
std::vector<BalanceTask>* tasks) {
std::vector<Zone*>& sortedActiveZonesRef = *sortedActiveZones;
std::map<std::string, std::vector<Host*>>& sortedZoneHostsRef = *sortedZoneHosts;
int32_t totalPartNum = 0;
int32_t avgPartNum = 0;
for (auto& z : sortedActiveZonesRef) {
totalPartNum += z->partNum_;
}
if (sortedActiveZonesRef.size() == 0) {
LOG(ERROR) << "rebalance error: no active zones";
return nebula::cpp2::ErrorCode::E_NO_HOSTS;
}
avgPartNum = totalPartNum / sortedActiveZonesRef.size();
int32_t remainder = totalPartNum - avgPartNum * sortedActiveZonesRef.size();
int32_t leftBegin = 0;
int32_t leftEnd = 0;
int32_t rightBegin = 0;
int32_t rightEnd = sortedActiveZonesRef.size();
for (size_t i = 0; i < sortedActiveZonesRef.size(); i++) {
if (avgPartNum <= sortedActiveZonesRef[i]->partNum_) {
leftEnd = i;
break;
}
}
for (size_t i = leftEnd; i < sortedActiveZonesRef.size(); i++) {
if (avgPartNum < sortedActiveZonesRef[i]->partNum_) {
rightBegin = i;
break;
}
}
auto findZoneToInsert = [&](PartitionID partId, const HostAddr& srcHost) -> bool {
for (int32_t leftIndex = leftBegin; leftIndex < leftEnd; leftIndex++) {
if (!sortedActiveZonesRef[leftIndex]->partExist(partId)) {
HostAddr dst = insertPartIntoZone(sortedZoneHosts, sortedActiveZonesRef[leftIndex], partId);
tasks->emplace_back(
jobId_, spaceInfo_.spaceId_, partId, srcHost, dst, kvstore_, adminClient_);
int32_t newLeftIndex = leftIndex;
for (; newLeftIndex < leftEnd - 1; newLeftIndex++) {
if (sortedActiveZonesRef[newLeftIndex]->partNum_ >
sortedActiveZonesRef[newLeftIndex + 1]->partNum_) {
std::swap(sortedActiveZonesRef[newLeftIndex], sortedActiveZonesRef[newLeftIndex + 1]);
} else {
break;
}
}
// if the zone's part reach the avgPartNum,it can't recieve parts any more
if (newLeftIndex == leftEnd - 1 &&
sortedActiveZonesRef[newLeftIndex]->partNum_ >= avgPartNum) {
leftEnd--;
}
// all zones in left side have reached avgPartNum,and now some of them will take
// avgPartNum+1 if there still has remainder
if (leftBegin == leftEnd) {
leftEnd = rightBegin;
}
return true;
}
}
return false;
};
for (int32_t right = rightBegin; right < rightEnd;) {
Zone* srcZone = sortedActiveZonesRef[right];
// if remainder>0 some zones will hold avgPartNum+1 patrs, we prioritise choosing zones in right
// side to hold them
if (srcZone->partNum_ == avgPartNum + 1 && remainder) {
right++;
remainder--;
continue;
}
if (srcZone->partNum_ == avgPartNum) {
right++;
continue;
}
std::vector<Host*>& sortedHosts = sortedZoneHostsRef[srcZone->zoneName_];
int32_t hostIndex = sortedHosts.size() - 1;
// to find a part to move,we prioritise moving parts from who has the most
for (; hostIndex >= 0; hostIndex--) {
std::set<PartitionID>& hostParts = sortedHosts[hostIndex]->parts_;
PartitionID movePart = -1;
for (PartitionID partId : hostParts) {
// to find a zone which does not contain the part in the left side to insert
bool matched = findZoneToInsert(partId, sortedHosts[hostIndex]->host_);
if (matched) {
movePart = partId;
break;
}
}
if (movePart != -1) {
hostParts.erase(movePart);
srcZone->partNum_--;
break;
}
}
for (int32_t i = hostIndex; i > 0; i--) {
if (sortedHosts[i]->parts_.size() <= sortedHosts[i - 1]->parts_.size()) {
std::swap(sortedHosts[i], sortedHosts[i - 1]);
} else {
break;
}
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

/* first, move the lostZones' parts to the active zones
* second, make balance for the active zones */
Status ZoneBalanceJobExecutor::buildBalancePlan() {
Expand Down Expand Up @@ -142,31 +265,16 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
return l->partNum_ < r->partNum_;
});

auto insertPartIntoZone = [&sortedZoneHosts](Zone* zone, PartitionID partId) -> HostAddr {
std::vector<Host*>& sortedHosts = sortedZoneHosts[zone->zoneName_];
sortedHosts.front()->parts_.emplace(partId);
zone->partNum_++;
HostAddr ha = sortedHosts.front()->host_;
for (size_t i = 0; i < sortedHosts.size() - 1; i++) {
if (sortedHosts[i]->parts_.size() >= sortedHosts[i + 1]->parts_.size()) {
std::swap(sortedHosts[i], sortedHosts[i + 1]);
} else {
break;
}
}
return ha;
};

auto chooseZoneToInsert = [&insertPartIntoZone,
&sortedActiveZones](PartitionID partId) -> HostAddr {
auto chooseZoneToInsert =
[this, &sortedActiveZones, &sortedZoneHosts](PartitionID partId) -> HostAddr {
size_t index = 0;
for (size_t i = 0; i < sortedActiveZones.size(); i++) {
if (!sortedActiveZones[i]->partExist(partId)) {
index = i;
break;
}
}
HostAddr ha = insertPartIntoZone(sortedActiveZones[index], partId);
HostAddr ha = insertPartIntoZone(&sortedZoneHosts, sortedActiveZones[index], partId);
for (size_t i = index; i < sortedActiveZones.size() - 1; i++) {
if (sortedActiveZones[i]->partNum_ >= sortedActiveZones[i + 1]->partNum_) {
std::swap(sortedActiveZones[i], sortedActiveZones[i + 1]);
Expand All @@ -181,126 +289,29 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
for (auto& zoneMapEntry : lostZones) {
Zone* zone = zoneMapEntry.second;
for (auto& hostMapEntry : zone->hosts_) {
for (PartitionID partId : hostMapEntry.second.parts_) {
const HostAddr& hostAddr = hostMapEntry.first;
Host& host = hostMapEntry.second;
for (PartitionID partId : host.parts_) {
HostAddr dst = chooseZoneToInsert(partId);
tasks.emplace_back(
jobId_, spaceInfo_.spaceId_, partId, hostMapEntry.first, dst, kvstore_, adminClient_);
jobId_, spaceInfo_.spaceId_, partId, hostAddr, dst, kvstore_, adminClient_);
}
hostMapEntry.second.parts_.clear();
host.parts_.clear();
}
zone->calPartNum();
}

// all parts of lost zones have moved to active zones, then rebalance the active zones
int32_t totalPartNum = 0;
int32_t avgPartNum = 0;
for (auto& z : sortedActiveZones) {
totalPartNum += z->partNum_;
}
if (sortedActiveZones.size() == 0) {
LOG(ERROR) << "rebalance error: no active zones";
return {};
}
avgPartNum = totalPartNum / sortedActiveZones.size();
int32_t remainder = totalPartNum - avgPartNum * sortedActiveZones.size();
int32_t leftBegin = 0;
int32_t leftEnd = 0;
int32_t rightBegin = 0;
int32_t rightEnd = sortedActiveZones.size();
for (size_t i = 0; i < sortedActiveZones.size(); i++) {
if (avgPartNum <= sortedActiveZones[i]->partNum_) {
leftEnd = i;
break;
}
}
for (size_t i = leftEnd; i < sortedActiveZones.size(); i++) {
if (avgPartNum < sortedActiveZones[i]->partNum_) {
rightBegin = i;
break;
}
}
for (int32_t right = rightBegin; right < rightEnd;) {
Zone* srcZone = sortedActiveZones[right];
// if remainder>0 some zones will hold avgPartNum+1 patrs, we prioritise taking the right side
// zones to hold them
if (srcZone->partNum_ == avgPartNum + 1 && remainder) {
right++;
remainder--;
continue;
}
if (srcZone->partNum_ == avgPartNum) {
right++;
continue;
}
std::vector<Host*>& sortedHosts = sortedZoneHosts[srcZone->zoneName_];
int32_t hostIndex = sortedHosts.size() - 1;
// to find a part to move,we prioritise moving parts from who has the most
for (; hostIndex >= 0; hostIndex--) {
std::set<PartitionID>& hostParts = sortedHosts[hostIndex]->parts_;
PartitionID movePart = -1;
for (PartitionID partId : hostParts) {
bool matched = false;
// to find a zone which does not contain the part in the left side to insert
for (int32_t leftIndex = leftBegin; leftIndex < leftEnd; leftIndex++) {
if (!sortedActiveZones[leftIndex]->partExist(partId)) {
HostAddr dst = insertPartIntoZone(sortedActiveZones[leftIndex], partId);
tasks.emplace_back(jobId_,
spaceInfo_.spaceId_,
partId,
sortedHosts[hostIndex]->host_,
dst,
kvstore_,
adminClient_);
movePart = partId;
int32_t newLeftIndex = leftIndex;
for (; newLeftIndex < leftEnd - 1; newLeftIndex++) {
if (sortedActiveZones[newLeftIndex]->partNum_ >
sortedActiveZones[newLeftIndex + 1]->partNum_) {
std::swap(sortedActiveZones[newLeftIndex], sortedActiveZones[newLeftIndex + 1]);
} else {
break;
}
}
// if the zone's part reach the avgPartNum,it can't recieve parts any more
if (newLeftIndex == leftEnd - 1 &&
sortedActiveZones[newLeftIndex]->partNum_ >= avgPartNum) {
leftEnd--;
}
// all zones in left side have reached avgPartNum,and now some of them will take
// avgPartNum+1 if there still has remainder
if (leftBegin == leftEnd) {
leftEnd = rightBegin;
}
matched = true;
break;
}
}
if (matched) {
break;
}
}
if (movePart != -1) {
hostParts.erase(movePart);
srcZone->partNum_--;
break;
}
}
for (int32_t i = hostIndex; i > 0; i--) {
if (sortedHosts[i]->parts_.size() <= sortedHosts[i - 1]->parts_.size()) {
std::swap(sortedHosts[i], sortedHosts[i - 1]);
} else {
break;
}
}
}
if (tasks.empty()) {
nebula::cpp2::ErrorCode rc = rebalanceActiveZones(&sortedActiveZones, &sortedZoneHosts, &tasks);

if (tasks.empty() || rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Balanced();
}
plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_));
for (BalanceTask& task : tasks) {
plan_->addTask(std::move(task));
}
nebula::cpp2::ErrorCode rc = plan_->saveInStore();
rc = plan_->saveInStore();
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("save balance zone plan failed");
}
Expand Down
7 changes: 7 additions & 0 deletions src/meta/processors/job/ZoneBalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor {
folly::Future<Status> executeInternal() override;
Status buildBalancePlan() override;
nebula::cpp2::ErrorCode updateMeta();
HostAddr insertPartIntoZone(std::map<std::string, std::vector<Host*>>* sortedZoneHosts,
Zone* zone,
PartitionID partId);
nebula::cpp2::ErrorCode rebalanceActiveZones(
std::vector<Zone*>* sortedActiveZones,
std::map<std::string, std::vector<Host*>>* sortedZoneHosts,
std::vector<BalanceTask>* tasks);

private:
std::vector<std::string> lostZones_;
Expand Down

0 comments on commit 67e6675

Please sign in to comment.