Skip to content

Commit

Permalink
make balanceTask bucket size be equal to distinct part number
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenhui-soul committed Dec 20, 2021
1 parent a10d398 commit 9277195
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 50 deletions.
1 change: 1 addition & 0 deletions src/meta/processors/job/BalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct SpaceInfo {
std::string name_;
GraphSpaceID spaceId_;
int32_t replica_;
// zone_name -> zone
std::map<std::string, Zone> zones_;
};

Expand Down
64 changes: 41 additions & 23 deletions src/meta/processors/job/BalancePlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,13 @@ void BalancePlan::dispatchTasks() {
for (auto& task : tasks_) {
partTasks[std::make_pair(task.spaceId_, task.partId_)].emplace_back(index++);
}
buckets_.resize(std::min(partTasks.size(), (size_t)FLAGS_task_concurrency));
buckets_.resize(partTasks.size());
int32_t bucketIndex = 0;
for (auto it = partTasks.begin(); it != partTasks.end(); it++) {
size_t minNum = tasks_.size();
int32_t i = 0, minIndex = 0;
for (auto& bucket : buckets_) {
if (bucket.size() < minNum) {
minNum = bucket.size();
minIndex = i;
}
i++;
}
for (auto taskIndex : it->second) {
buckets_[minIndex].emplace_back(taskIndex);
buckets_[bucketIndex].emplace_back(taskIndex);
}
bucketIndex++;
}
}

Expand Down Expand Up @@ -66,17 +59,29 @@ void BalancePlan::invoke() {
stopped = stopped_;
}
if (finished) {
CHECK_EQ(j, this->buckets_[i].size() - 1);
CHECK_EQ(j, buckets_[i].size() - 1);
saveInStore(true);
onFinished_(stopped ? meta::cpp2::JobStatus::STOPPED
: (failed_ ? meta::cpp2::JobStatus::FAILED
: meta::cpp2::JobStatus::FINISHED));
} else if (j + 1 < this->buckets_[i].size()) {
auto& task = this->tasks_[this->buckets_[i][j + 1]];
} else if (j + 1 < buckets_[i].size()) {
auto& task = tasks_[buckets_[i][j + 1]];
if (stopped) {
task.ret_ = BalanceTaskResult::INVALID;
}
task.invoke();
} else {
size_t index = curIndex_.fetch_add(1, std::memory_order_relaxed);
if (index < buckets_.size()) {
Bucket& bucket = buckets_[index];
if (!bucket.empty()) {
auto& task = tasks_[bucket[0]];
if (stopped) {
task.ret_ = BalanceTaskResult::INVALID;
}
task.invoke();
}
}
}
}; // onFinished
tasks_[taskIndex].onError_ = [this, i, j, taskIndex]() {
Expand All @@ -95,28 +100,41 @@ void BalancePlan::invoke() {
stopped = stopped_;
}
if (finished) {
CHECK_EQ(j, this->buckets_[i].size() - 1);
CHECK_EQ(j, buckets_[i].size() - 1);
onFinished_(stopped ? meta::cpp2::JobStatus::STOPPED : meta::cpp2::JobStatus::FAILED);
} else if (j + 1 < this->buckets_[i].size()) {
auto& task = this->tasks_[this->buckets_[i][j + 1]];
} else if (j + 1 < buckets_[i].size()) {
auto& task = tasks_[buckets_[i][j + 1]];
if (tasks_[taskIndex].spaceId_ == task.spaceId_ &&
tasks_[taskIndex].partId_ == task.partId_) {
LOG(INFO) << "Skip the task for the same partId " << task.partId_;
task.ret_ = BalanceTaskResult::FAILED;
}
if (stopped) {
task.ret_ = BalanceTaskResult::INVALID;
}
task.ret_ = BalanceTaskResult::INVALID;
task.invoke();
} else {
size_t index = curIndex_.fetch_add(1, std::memory_order_relaxed);
if (index < buckets_.size()) {
Bucket& bucket = buckets_[index];
if (!bucket.empty()) {
auto& task = tasks_[bucket[0]];
if (stopped) {
task.ret_ = BalanceTaskResult::INVALID;
}
task.invoke();
}
}
}
}; // onError
} // for (auto j = 0; j < buckets_[i].size(); j++)
} // for (auto i = 0; i < buckets_.size(); i++)

saveInStore(true);
for (auto& bucket : buckets_) {
if (!bucket.empty()) {
tasks_[bucket[0]].invoke();
uint32 bucketSize = buckets_.size();
int32_t concurrency = std::min(FLAGS_task_concurrency, bucketSize);
curIndex_.store(concurrency, std::memory_order_relaxed);
for (int32_t i = 0; i < concurrency; i++) {
if (!buckets_[i].empty()) {
tasks_[buckets_[i][0]].invoke();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/BalancePlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ namespace nebula {
namespace meta {

class BalancePlan {
friend class Balancer;
friend class DataBalanceJobExecutor;
FRIEND_TEST(BalanceTest, BalancePlanTest);
FRIEND_TEST(BalanceTest, NormalTest);
Expand Down Expand Up @@ -98,6 +97,7 @@ class BalancePlan {
// List of task index in tasks_;
using Bucket = std::vector<int32_t>;
std::vector<Bucket> buckets_;
std::atomic<int32_t> curIndex_;
};

} // namespace meta
Expand Down
36 changes: 19 additions & 17 deletions src/meta/processors/job/DataBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ folly::Future<Status> DataBalanceJobExecutor::executeInternal() {
Status DataBalanceJobExecutor::buildBalancePlan() {
std::map<std::string, std::vector<Host*>> lostZoneHost;
std::map<std::string, std::vector<Host*>> activeSortedHost;
for (auto& p : spaceInfo_.zones_) {
for (auto& ph : p.second.hosts_) {
activeSortedHost[p.first].push_back(&ph.second);
for (auto& zoneMapEntry : spaceInfo_.zones_) {
for (auto& hostMapEntry : zoneMapEntry.second.hosts_) {
activeSortedHost[zoneMapEntry.first].push_back(&hostMapEntry.second);
}
}
for (HostAddr ha : lostHosts_) {
if (!spaceInfo_.hasHost(ha)) {
for (HostAddr hosts : lostHosts_) {
if (!spaceInfo_.hasHost(hosts)) {
return Status::Error(
"Host %s does not belong to space %d", ha.toString().c_str(), spaceInfo_.spaceId_);
"Host %s does not belong to space %d", hosts.toString().c_str(), spaceInfo_.spaceId_);
}
for (auto& zoneMapEntry : spaceInfo_.zones_) {
auto it = zoneMapEntry.second.hosts_.find(ha);
auto it = zoneMapEntry.second.hosts_.find(hosts);
if (it != zoneMapEntry.second.hosts_.end()) {
lostZoneHost[zoneMapEntry.first].push_back(&it->second);
std::vector<Host*>& hvec = activeSortedHost[zoneMapEntry.first];
Expand All @@ -62,14 +62,15 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
});
}
plan_.reset(new BalancePlan(jobDescription_, kvstore_, adminClient_));
for (auto& p : lostZoneHost) {
std::vector<Host*>& hvec = activeSortedHost[p.first];
for (Host* h : p.second) {
for (PartitionID partId : h->parts_) {
// move parts of lost hosts to active hosts in the same zone
for (auto& zoneHostEntry : lostZoneHost) {
std::vector<Host*>& hvec = activeSortedHost[zoneHostEntry.first];
for (Host* host : zoneHostEntry.second) {
for (PartitionID partId : host->parts_) {
Host* dstHost = hvec.front();
dstHost->parts_.insert(partId);
plan_->addTask(BalanceTask(
jobId_, spaceInfo_.spaceId_, partId, h->ha_, dstHost->ha_, kvstore_, adminClient_));
jobId_, spaceInfo_.spaceId_, partId, host->ha_, dstHost->ha_, kvstore_, adminClient_));
for (size_t i = 0; i < hvec.size() - 1; i++) {
if (hvec[i]->parts_.size() > hvec[i + 1]->parts_.size()) {
std::swap(hvec[i], hvec[i + 1]);
Expand All @@ -78,10 +79,11 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
}
}
}
h->parts_.clear();
host->parts_.clear();
}
}
lostZoneHost.clear();
// rebalance for hosts in a zone
auto balanceHostVec = [this](std::vector<Host*>& hostVec) -> std::vector<BalanceTask> {
size_t totalPartNum = 0;
size_t avgPartNum = 0;
Expand All @@ -101,9 +103,9 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
break;
}
}
for (size_t i = 0; i < hostVec.size(); i++) {
for (size_t i = leftEnd; i < hostVec.size(); i++) {
rightBegin = i;
if (avgPartNum < hostVec[i]->parts_.size()) {
rightBegin = i;
break;
}
}
Expand Down Expand Up @@ -145,8 +147,8 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
}
return tasks;
};
for (auto& p : activeSortedHost) {
std::vector<Host*>& hvec = p.second;
for (auto& pair : activeSortedHost) {
std::vector<Host*>& hvec = pair.second;
std::vector<BalanceTask> tasks = balanceHostVec(hvec);
for (BalanceTask& task : tasks) {
plan_->addTask(std::move(task));
Expand Down
6 changes: 4 additions & 2 deletions src/meta/processors/job/ZoneBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::updateMeta() {
for (std::string& zn : lostZones_) {
spaceInfo_.zones_.erase(zn);
}
for (auto& p : spaceInfo_.zones_) {
zones.push_back(p.first);
for (auto& zoneMapEntry : spaceInfo_.zones_) {
zones.emplace_back(zoneMapEntry.first);
}
properties.set_zone_names(std::move(zones));
std::vector<kvstore::KV> data;
Expand Down Expand Up @@ -177,6 +177,7 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
return ha;
};

// move parts of lost zones to active zones
for (auto& zoneMapEntry : lostZones) {
Zone* zone = zoneMapEntry.second;
for (auto& hostMapEntry : zone->hosts_) {
Expand All @@ -190,6 +191,7 @@ Status ZoneBalanceJobExecutor::buildBalancePlan() {
zone->calPartNum();
}

// all parts of lost zones have moved to active zones, then rebalance the active zones
int32_t totalPartNum = 0;
int32_t avgPartNum = 0;
for (auto& z : sortedActiveZones) {
Expand Down
8 changes: 4 additions & 4 deletions src/parser/AdminSentences.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,9 @@ std::string AdminJobSentence::toString() const {
return "INGEST";
case meta::cpp2::AdminCmd::DATA_BALANCE:
if (paras_.empty()) {
return "SUBMIT JOB BALANCE DATA";
return "SUBMIT JOB BALANCE IN ZONE";
} else {
std::string str = "SUBMIT JOB BALANCE DATA REMOVE";
std::string str = "SUBMIT JOB BALANCE IN ZONE REMOVE";
for (size_t i = 0; i < paras_.size(); i++) {
auto &s = paras_[i];
str += i == 0 ? " " + s : ", " + s;
Expand All @@ -258,9 +258,9 @@ std::string AdminJobSentence::toString() const {
}
case meta::cpp2::AdminCmd::ZONE_BALANCE:
if (paras_.empty()) {
return "SUBMIT JOB BALANCE ZONE";
return "SUBMIT JOB BALANCE ACROSS ZONE";
} else {
std::string str = "SUBMIT JOB BALANCE ZONE REMOVE";
std::string str = "SUBMIT JOB BALANCE ACROSS ZONE REMOVE";
for (size_t i = 0; i < paras_.size(); i++) {
auto &s = paras_[i];
str += i == 0 ? " " + s : ", " + s;
Expand Down
6 changes: 3 additions & 3 deletions src/parser/test/ParserTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3153,10 +3153,10 @@ TEST_F(ParserTest, JobTest) {
checkTest("SUBMIT JOB FLUSH 111", "SUBMIT JOB FLUSH 111");
checkTest("SUBMIT JOB STATS", "SUBMIT JOB STATS");
checkTest("SUBMIT JOB STATS 111", "SUBMIT JOB STATS 111");
checkTest("SUBMIT JOB BALANCE DATA", "SUBMIT JOB BALANCE DATA");
checkTest("SUBMIT JOB BALANCE IN ZONE", "SUBMIT JOB BALANCE IN ZONE");
checkTest(
"SUBMIT JOB BALANCE DATA REMOVE 192.168.0.1:50000, 192.168.0.1:50001, 192.168.0.1:50002",
"SUBMIT JOB BALANCE DATA REMOVE \"192.168.0.1\":50000, \"192.168.0.1\":50001, "
"SUBMIT JOB BALANCE IN ZONE REMOVE 192.168.0.1:50000, 192.168.0.1:50001, 192.168.0.1:50002",
"SUBMIT JOB BALANCE IN ZONE REMOVE \"192.168.0.1\":50000, \"192.168.0.1\":50001, "
"\"192.168.0.1\":50002");
checkTest("SUBMIT JOB BALANCE LEADER", "SUBMIT JOB BALANCE LEADER");
checkTest("SHOW JOBS", "SHOW JOBS");
Expand Down

0 comments on commit 9277195

Please sign in to comment.