Skip to content

Commit

Permalink
Merge branch 'master' into memory_in_raft
Browse files Browse the repository at this point in the history
  • Loading branch information
Sophie-Xie authored Apr 21, 2022
2 parents 25f7162 + 2e2987f commit 4818e6a
Show file tree
Hide file tree
Showing 14 changed files with 232 additions and 111 deletions.
20 changes: 14 additions & 6 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,14 @@ void MetaClient::heartBeatThreadFunc() {
bgThread_->addDelayTask(
FLAGS_heartbeat_interval_secs * 1000, &MetaClient::heartBeatThreadFunc, this);
};
auto ret = heartbeat().get();
if (!ret.ok()) {
LOG(ERROR) << "Heartbeat failed, status:" << ret.status();
return;
// UNKNOWN is reserved for tools such as upgrader, in that case the ip/port is not set. We do
// not send heartbeat to meta to avoid writing error host info (e.g. Host("", 0))
if (options_.role_ != cpp2::HostRole::UNKNOWN) {
auto ret = heartbeat().get();
if (!ret.ok()) {
LOG(ERROR) << "Heartbeat failed, status:" << ret.status();
return;
}
}

// if MetaServer has some changes, refresh the localCache_
Expand Down Expand Up @@ -236,7 +240,9 @@ bool MetaClient::loadUsersAndRoles() {
}

bool MetaClient::loadData() {
if (localDataLastUpdateTime_ == metadLastUpdateTime_) {
// UNKNOWN role will skip heartbeat
if (options_.role_ != cpp2::HostRole::UNKNOWN &&
localDataLastUpdateTime_ == metadLastUpdateTime_) {
return true;
}

Expand Down Expand Up @@ -2958,7 +2964,9 @@ StatusOr<std::vector<RemoteListenerInfo>> MetaClient::getListenerHostTypeBySpace
}

bool MetaClient::loadCfg() {
if (options_.skipConfig_ || localCfgLastUpdateTime_ == metadLastUpdateTime_) {
// UNKNOWN role will skip heartbeat
if (options_.skipConfig_ || (options_.role_ != cpp2::HostRole::UNKNOWN &&
localCfgLastUpdateTime_ == metadLastUpdateTime_)) {
return true;
}
if (!configReady_ && !registerCfg()) {
Expand Down
1 change: 1 addition & 0 deletions src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ std::vector<std::string> NebulaKeyUtils::snapshotPrefix(PartitionID partId) {
if (partId == 0) {
result.emplace_back("");
} else {
result.emplace_back(vertexPrefix(partId));
result.emplace_back(tagPrefix(partId));
result.emplace_back(edgePrefix(partId));
result.emplace_back(IndexKeyUtils::indexPrefix(partId));
Expand Down
3 changes: 1 addition & 2 deletions src/graph/executor/algo/BFSShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,12 @@ folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
std::vector<folly::Future<DataSet>> futures;
for (auto& vid : meetVids) {
batchVids.push_back(vid);
if (i == totalSize - 1 || batchVids.size() == batchSize) {
if (++i == totalSize || batchVids.size() == batchSize) {
auto future = folly::via(runner(), [this, vids = std::move(batchVids), oddStep]() {
return doConjunct(vids, oddStep);
});
futures.emplace_back(std::move(future));
}
i++;
}

return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
Expand Down
208 changes: 156 additions & 52 deletions src/graph/executor/algo/MultiShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,19 @@ folly::Future<Status> MultiShortestPathExecutor::execute() {
})
.thenValue([this](auto&& resp) {
UNUSED(resp);
preLeftPaths_.swap(leftPaths_);
preRightPaths_.swap(rightPaths_);
preRightPaths_ = rightPaths_;
// update history
for (auto& iter : leftPaths_) {
historyLeftPaths_[iter.first].insert(std::make_move_iterator(iter.second.begin()),
std::make_move_iterator(iter.second.end()));
}
for (auto& iter : rightPaths_) {
historyRightPaths_[iter.first].insert(std::make_move_iterator(iter.second.begin()),
std::make_move_iterator(iter.second.end()));
}
leftPaths_.clear();
rightPaths_.clear();

step_++;
DataSet ds;
ds.colNames = pathNode_->colNames();
Expand All @@ -58,24 +67,40 @@ void MultiShortestPathExecutor::init() {
for (; rIter->valid(); rIter->next()) {
auto& vid = rIter->getColumn(0);
if (rightVids.emplace(vid).second) {
preRightPaths_[vid].push_back({Path(Vertex(vid, {}), {})});
std::vector<Path> tmp({Path(Vertex(vid, {}), {})});
historyRightPaths_[vid].emplace(vid, tmp);
preRightPaths_[vid].emplace(vid, std::move(tmp));
}
}

std::set<Value> leftVids;
for (; lIter->valid(); lIter->next()) {
auto& vid = lIter->getColumn(0);
std::vector<Path> tmp({Path(Vertex(vid, {}), {})});
historyLeftPaths_[vid].emplace(vid, std::move(tmp));
leftVids.emplace(vid);
}
for (const auto& leftVid : leftVids) {
for (const auto& rightVid : rightVids) {
if (leftVid != rightVid) {
terminationMap_.insert({leftVid, {rightVid, true}});
terminationMap_.emplace(leftVid, std::make_pair(rightVid, true));
}
}
}
}

std::vector<Path> MultiShortestPathExecutor::createPaths(const std::vector<Path>& paths,
const Edge& edge) {
std::vector<Path> newPaths;
newPaths.reserve(paths.size());
for (const auto& p : paths) {
Path path = p;
path.steps.emplace_back(Step(Vertex(edge.dst, {}), edge.type, edge.name, edge.ranking, {}));
newPaths.emplace_back(std::move(path));
}
return newPaths;
}

Status MultiShortestPathExecutor::buildPath(bool reverse) {
auto iter = reverse ? ectx_->getResult(pathNode_->rightInputVar()).iter()
: ectx_->getResult(pathNode_->leftInputVar()).iter();
Expand All @@ -96,10 +121,23 @@ Status MultiShortestPathExecutor::buildPath(bool reverse) {
Path path;
path.src = Vertex(src, {});
path.steps.emplace_back(Step(Vertex(dst, {}), edge.type, edge.name, edge.ranking, {}));
currentPaths[dst].emplace_back(std::move(path));
auto foundDst = currentPaths.find(dst);
if (foundDst != currentPaths.end()) {
auto foundSrc = foundDst->second.find(src);
if (foundSrc != foundDst->second.end()) {
// same <src, dst>, different edge type or rank
foundSrc->second.emplace_back(std::move(path));
} else {
std::vector<Path> tmp({std::move(path)});
foundDst->second.emplace(src, std::move(tmp));
}
} else {
std::vector<Path> tmp({std::move(path)});
currentPaths[dst].emplace(src, std::move(tmp));
}
}
} else {
auto& historyPaths = reverse ? preRightPaths_ : preLeftPaths_;
auto& historyPaths = reverse ? historyRightPaths_ : historyLeftPaths_;
for (; iter->valid(); iter->next()) {
auto edgeVal = iter->getEdge();
if (UNLIKELY(!edgeVal.isEdge())) {
Expand All @@ -108,50 +146,93 @@ Status MultiShortestPathExecutor::buildPath(bool reverse) {
auto& edge = edgeVal.getEdge();
auto& src = edge.src;
auto& dst = edge.dst;
for (const auto& histPath : historyPaths[src]) {
Path path = histPath;
path.steps.emplace_back(Step(Vertex(dst, {}), edge.type, edge.name, edge.ranking, {}));
if (path.hasDuplicateVertices()) {
continue;
auto& prePaths = historyPaths[src];

auto foundHistDst = historyPaths.find(dst);
if (foundHistDst == historyPaths.end()) {
// dst not in history
auto foundDst = currentPaths.find(dst);
if (foundDst == currentPaths.end()) {
// dst not in current, new edge
for (const auto& prePath : prePaths) {
currentPaths[dst].emplace(prePath.first, createPaths(prePath.second, edge));
}
} else {
// dst in current
for (const auto& prePath : prePaths) {
auto newPaths = createPaths(prePath.second, edge);
auto foundSrc = foundDst->second.find(prePath.first);
if (foundSrc == foundDst->second.end()) {
foundDst->second.emplace(prePath.first, std::move(newPaths));
} else {
foundSrc->second.insert(foundSrc->second.begin(),
std::make_move_iterator(newPaths.begin()),
std::make_move_iterator(newPaths.end()));
}
}
}
} else {
// dst in history
auto& historyDstPaths = foundHistDst->second;
for (const auto& prePath : prePaths) {
if (historyDstPaths.find(prePath.first) != historyDstPaths.end()) {
// loop: a->b->c->a or a->b->c->b,
// filter out path that with duplicate vertex or have already been found before
continue;
}
auto foundDst = currentPaths.find(dst);
if (foundDst == currentPaths.end()) {
currentPaths[dst].emplace(prePath.first, createPaths(prePath.second, edge));
} else {
auto newPaths = createPaths(prePath.second, edge);
auto foundSrc = foundDst->second.find(prePath.first);
if (foundSrc == foundDst->second.end()) {
foundDst->second.emplace(prePath.first, std::move(newPaths));
} else {
foundSrc->second.insert(foundSrc->second.begin(),
std::make_move_iterator(newPaths.begin()),
std::make_move_iterator(newPaths.end()));
}
}
}
currentPaths[dst].emplace_back(std::move(path));
}
}
}

// set nextVid
const auto& nextVidVar = reverse ? pathNode_->rightVidVar() : pathNode_->leftVidVar();
setNextStepVid(currentPaths, nextVidVar);
return Status::OK();
}

DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter,
Interims::iterator endIter,
bool oddStep) {
auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_;
DataSet ds;
for (; startIter != endIter; ++startIter) {
auto found = rightPaths.find(startIter->first);
if (found == rightPaths.end()) {
continue;
}
for (const auto& lPath : startIter->second) {
const auto& srcVid = lPath.src.vid;
auto range = terminationMap_.equal_range(srcVid);
for (const auto& rPath : found->second) {
const auto& dstVid = rPath.src.vid;
for (auto iter = range.first; iter != range.second; ++iter) {
if (iter->second.first == dstVid) {
auto forwardPath = lPath;
auto backwardPath = rPath;
DataSet MultiShortestPathExecutor::doConjunct(
const std::vector<std::pair<Interims::iterator, Interims::iterator>>& iters) {
auto buildPaths =
[](const std::vector<Path>& leftPaths, const std::vector<Path>& rightPaths, DataSet& ds) {
for (const auto& leftPath : leftPaths) {
for (const auto& rightPath : rightPaths) {
auto forwardPath = leftPath;
auto backwardPath = rightPath;
backwardPath.reverse();
forwardPath.append(std::move(backwardPath));
if (forwardPath.hasDuplicateVertices()) {
continue;
}
Row row;
row.values.emplace_back(std::move(forwardPath));
ds.rows.emplace_back(std::move(row));
iter->second.second = false;
}
}
};

DataSet ds;
for (const auto& iter : iters) {
const auto& leftPaths = iter.first->second;
const auto& rightPaths = iter.second->second;
for (const auto& leftPath : leftPaths) {
auto range = terminationMap_.equal_range(leftPath.first);
for (const auto& rightPath : rightPaths) {
for (auto found = range.first; found != range.second; ++found) {
if (found->second.first == rightPath.first) {
buildPaths(leftPath.second, rightPath.second, ds);
found->second.second = false;
}
}
}
Expand All @@ -161,28 +242,51 @@ DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter,
}

folly::Future<bool> MultiShortestPathExecutor::conjunctPath(bool oddStep) {
size_t batchSize = leftPaths_.size() / static_cast<size_t>(FLAGS_num_operator_threads);
auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_;
size_t leftPathSize = leftPaths_.size();
size_t rightPathSize = rightPaths.size();
std::vector<folly::Future<DataSet>> futures;
size_t i = 0;
std::vector<std::pair<Interims::iterator, Interims::iterator>> pathIters;

auto startIter = leftPaths_.begin();
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
if (i++ == batchSize) {
auto endIter = leftIter;
endIter++;
auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() {
return doConjunct(startIter, endIter, oddStep);
});
futures.emplace_back(std::move(future));
i = 0;
startIter = endIter;
size_t i = 0;
if (leftPathSize > rightPathSize) {
size_t batchSize = leftPathSize / static_cast<size_t>(FLAGS_num_operator_threads);
pathIters.reserve(batchSize);
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
auto rightIter = rightPaths.find(leftIter->first);
if (rightIter == rightPaths.end()) {
continue;
}
pathIters.emplace_back(leftIter, rightIter);
if (++i == batchSize) {
auto future = folly::via(
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
futures.emplace_back(std::move(future));
pathIters.reserve(batchSize);
i = 0;
}
}
} else {
size_t batchSize = rightPathSize / static_cast<size_t>(FLAGS_num_operator_threads);
pathIters.reserve(batchSize);
for (auto rightIter = rightPaths.begin(); rightIter != rightPaths.end(); ++rightIter) {
auto leftIter = leftPaths_.find(rightIter->first);
if (leftIter == leftPaths_.end()) {
continue;
}
pathIters.emplace_back(leftIter, rightIter);
if (++i == batchSize) {
auto future = folly::via(
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
futures.emplace_back(std::move(future));
pathIters.reserve(batchSize);
i = 0;
}
}
}
if (i != 0) {
auto endIter = leftPaths_.end();
auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() {
return doConjunct(startIter, endIter, oddStep);
});
auto future =
folly::via(runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
futures.emplace_back(std::move(future));
}

Expand Down
13 changes: 8 additions & 5 deletions src/graph/executor/algo/MultiShortestPathExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,27 @@ class MultiShortestPathExecutor final : public Executor {
folly::Future<Status> execute() override;

private:
// k: dst, v: paths to dst
using Interims = std::unordered_map<Value, std::vector<Path>>;
// key: dst, value: {key : src, value: paths}
using Interims = std::unordered_map<Value, std::unordered_map<Value, std::vector<Path>>>;

void init();
std::vector<Path> createPaths(const std::vector<Path>& paths, const Edge& edge);
Status buildPath(bool reverse);
folly::Future<bool> conjunctPath(bool oddStep);
DataSet doConjunct(Interims::iterator startIter, Interims::iterator endIter, bool oddStep);
DataSet doConjunct(const std::vector<std::pair<Interims::iterator, Interims::iterator>>& iters);
void setNextStepVid(const Interims& paths, const string& var);

private:
const MultiShortestPath* pathNode_{nullptr};
size_t step_{1};
std::string terminationVar_;
// {src, <dst, true>}
std::unordered_multimap<Value, std::pair<Value, bool>> terminationMap_;
Interims leftPaths_;
Interims preLeftPaths_;
Interims preRightPaths_;
Interims rightPaths_;
Interims preRightPaths_;
Interims historyLeftPaths_;
Interims historyRightPaths_;
DataSet currentDs_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/algo/ProduceAllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ folly::Future<Status> ProduceAllPathsExecutor::conjunctPath() {

auto startIter = leftPaths_.begin();
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
if (i++ == batchSize) {
if (++i == batchSize) {
auto endIter = leftIter;
endIter++;
auto oddStepFuture = folly::via(
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/test/FindPathTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ TEST_F(FindPathTest, multiSourceShortestPath) {
{
DataSet expectLeftVid;
expectLeftVid.colNames = {nebula::kVid};
for (const auto& vid : {"a", "b", "c", "f", "g"}) {
for (const auto& vid : {"b", "f", "g"}) {
Row row;
row.values.emplace_back(vid);
expectLeftVid.rows.emplace_back(std::move(row));
Expand Down
Loading

0 comments on commit 4818e6a

Please sign in to comment.