diff --git a/src/graph/executor/PathBaseExecutor.cpp b/src/graph/executor/PathBaseExecutor.cpp index 11eaa522908..e14e7cb1ecf 100644 --- a/src/graph/executor/PathBaseExecutor.cpp +++ b/src/graph/executor/PathBaseExecutor.cpp @@ -175,7 +175,9 @@ void PathBaseExecutor::addGetNeighborStats(RpcResponse& resp, if (result.vertices_ref().has_value()) { size = (*result.vertices_ref()).size(); } - auto info = util::collectRespProfileData(result.result, hostLatency[i], size, timeInUSec); + // auto info = util::collectRespProfileData(result.result, hostLatency[i], size, timeInUSec); + UNUSED(timeInUSec); + auto info = util::collectRespProfileData(result.result, hostLatency[i], size); stats.push_back(std::move(info)); } @@ -190,7 +192,9 @@ void PathBaseExecutor::addGetPropStats(PropRpcResponse& resp, int64_t timeInUSec auto& hostLatency = resp.hostLatency(); for (size_t i = 0; i < hostLatency.size(); ++i) { const auto& result = resp.responses()[i].get_result(); - auto info = util::collectRespProfileData(result, hostLatency[i], 0, timeInUSec); + // auto info = util::collectRespProfileData(result, hostLatency[i], 0, timeInUSec); + UNUSED(timeInUSec); + auto info = util::collectRespProfileData(result, hostLatency[i], 0); stats.push_back(std::move(info)); } diff --git a/src/graph/executor/algo/AllPathsExecutor.cpp b/src/graph/executor/algo/AllPathsExecutor.cpp index a5b6431e5df..0f7a23bc8d0 100644 --- a/src/graph/executor/algo/AllPathsExecutor.cpp +++ b/src/graph/executor/algo/AllPathsExecutor.cpp @@ -9,6 +9,7 @@ DEFINE_uint32(path_threshold_size, 100, ""); DEFINE_uint32(path_threshold_ratio, 2, ""); +DEFINE_uint32(path_batch_size, 3000, ""); namespace nebula { namespace graph { @@ -16,7 +17,7 @@ folly::Future AllPathsExecutor::execute() { SCOPED_TIMER(&execTime_); pathNode_ = asNode(node()); noLoop_ = pathNode_->noLoop(); - steps_ = pathNode_->steps(); + maxStep_ = pathNode_->steps(); withProp_ = pathNode_->withProp(); if (pathNode_->limit() != -1) { limit_ = pathNode_->limit(); @@ -65,7 +66,7 @@ void AllPathsExecutor::init() { AllPathsExecutor::Direction AllPathsExecutor::direction() { auto leftSize = leftVids_.size(); auto rightSize = rightVids_.size(); - if (leftSteps_ + rightSteps_ + 1 == steps_) { + if (leftSteps_ + rightSteps_ + 1 == maxStep_) { if (leftSize > rightSize) { ++rightSteps_; return Direction::kRight; @@ -112,7 +113,7 @@ folly::Future AllPathsExecutor::doAllPaths() { return folly::makeFuture(std::move(resp)); } } - if (leftSteps_ + rightSteps_ < steps_) { + if (leftSteps_ + rightSteps_ < maxStep_) { if (leftVids_.empty() || rightVids_.empty()) { return buildResult(); } else { @@ -254,6 +255,11 @@ void AllPathsExecutor::expandFromLeft(GetNeighborsIter* iter) { } folly::Future AllPathsExecutor::buildResult() { + // when the key in the right adjacency list does not exist in the left adjacency list + // add key & values to the left adjacency list, + // if key exists, discard the right adjacency's key & values + // because the right adjacency list may have fewer edges + // a->c->o, a->b, c->f, f->o for (auto& rAdj : rightAdjList_) { auto& src = rAdj.first; auto iter = leftAdjList_.find(src); @@ -285,105 +291,156 @@ folly::Future AllPathsExecutor::buildResult() { } } } + // asyn return + return folly::via(runner()) + .thenValue([this]() { + if (!withProp_ || emptyPropVids_.empty()) { + return finish(ResultBuilder().value(Value(std::move(result_))).build()); + } + return getProps(emptyPropVids_, pathNode_->vertexProps()); + }) + .thenValue([this](std::vector&& vertices) { + for (auto& vertex : vertices) { + if (vertex.empty()) { + continue; + } + auto iter = leftAdjList_.find(vertex); + if (iter != leftAdjList_.end()) { + auto val = iter->first; + auto& mutableVertex = val.mutableVertex(); + mutableVertex.tags.swap(vertex.mutableVertex().tags); + } + } + return finish(ResultBuilder().value(Value(std::move(result_))).build()); + }); - auto futures = buildPathMultiJobs(); - folly::collect(futures).via(runner()).thenValue([this](std::vector>&& resp) { - DLOG(ERROR) << "collect resp 's size " << resp.size(); - for (auto& rows : resp) { - DLOG(ERROR) << "rows ' size " << rows.size(); - if (rows.empty()) { - continue; - } - result_.rows.insert(result_.rows.end(), - std::make_move_iterator(rows.begin()), - std::make_move_iterator(rows.end())); - } - DLOG(ERROR) << " result' s size " << result_.rows.size(); - DLOG(ERROR) << "reuslt is " << result_.toString(); - if (limit_ != std::numeric_limits::max() && limit_ < result_.rows.size()) { - result_.rows.resize(limit_); - } - }); - DLOG(ERROR) << "test"; + // auto futures = buildPathMultiJobs(); + // folly::collect(futures).via(runner()).thenValue([this](std::vector>&& resp) { + // DLOG(ERROR) << "collect resp 's size " << resp.size(); + // for (auto& rows : resp) { + // DLOG(ERROR) << "rows ' size " << rows.size(); + // if (rows.empty()) { + // continue; + // } + // result_.rows.insert(result_.rows.end(), + // std::make_move_iterator(rows.begin()), + // std::make_move_iterator(rows.end())); + // } + // DLOG(ERROR) << " result' s size " << result_.rows.size(); + // DLOG(ERROR) << "reuslt is " << result_.toString(); + // if (limit_ != std::numeric_limits::max() && limit_ < result_.rows.size()) { + // result_.rows.resize(limit_); + // } + // }); + // DLOG(ERROR) << "test"; - if (!withProp_ || emptyPropVids_.empty()) { - return finish(ResultBuilder().value(Value(std::move(result_))).build()); - } + // if (!withProp_ || emptyPropVids_.empty()) { + // return finish(ResultBuilder().value(Value(std::move(result_))).build()); + // } - auto future = getProps(emptyPropVids_, pathNode_->vertexProps()); - return future.via(runner()).thenValue([this](auto&& vertices) { - for (auto& vertex : vertices) { - if (vertex.empty()) { - continue; - } - auto iter = leftAdjList_.find(vertex); - if (iter != leftAdjList_.end()) { - auto val = iter->first; - auto& mutableVertex = val.mutableVertex(); - mutableVertex.tags.swap(vertex.mutableVertex().tags); - } - } - return finish(ResultBuilder().value(Value(std::move(result_))).build()); - }); + // auto future = getProps(emptyPropVids_, pathNode_->vertexProps()); + // return future.via(runner()).thenValue([this](auto&& vertices) { + // for (auto& vertex : vertices) { + // if (vertex.empty()) { + // continue; + // } + // auto iter = leftAdjList_.find(vertex); + // if (iter != leftAdjList_.end()) { + // auto val = iter->first; + // auto& mutableVertex = val.mutableVertex(); + // mutableVertex.tags.swap(vertex.mutableVertex().tags); + // } + // } + // return finish(ResultBuilder().value(Value(std::move(result_))).build()); + // }); } -std::vector>> AllPathsExecutor::buildPathMultiJobs() { - std::vector>> futures; - nebula::thread::GenericThreadPool pool; - pool.start(FLAGS_num_path_thread, "build-path"); +std::vector AllPathsExecutor::doBuildPath( + size_t step, + size_t start, + size_t end, + std::shared_ptr>> edgeLists) { + if (cnt_.load(std::memory_order_relaxed) >= limit_) { + return std::vector(); + } - using funType = std::function(size_t, Value, std::vector>)>; - funType task = [this, &task, &pool, &futures]( - size_t step, Value&& src, std::vector>&& edgeLists) { - DLOG(ERROR) << "step is : " << step << " src : " << src.toString() << " edgeLists.size() " - << edgeLists.size(); - auto& adjList = leftAdjList_; - std::vector result; - std::vector> newEdgeLists; - newEdgeLists.reserve(edgeLists.size()); + std::vector> futures; + auto& adjList = leftAdjList_; + std::vector paths; + auto newEdgeLists = std::make_shared>>(edgeLists->size()); - for (auto& edgeList : edgeLists) { - auto& dst = edgeList.back().getEdge().dst; - auto dstIter = rightInitVids_.find(dst); - if (dstIter != rightInitVids_.end()) { - Row row; - row.values.emplace_back(src); - row.values.emplace_back(List(edgeList)); - row.values.emplace_back(*dstIter); - result.emplace_back(std::move(row)); - ++cnt_; + for (auto i = start; i < end; ++i) { + auto& edgeList = (*edgeLists)[i]; + auto& dst = edgeList.back().getEdge().dst; + auto dstIter = rightInitVids_.find(dst); + if (dstIter != rightInitVids_.end()) { + Row row; + row.values.emplace_back(List(edgeList)); + row.values.emplace_back(*dstIter); + paths.emplace_back(std::move(row)); + ++cnt_; + if (cnt_.load(std::memory_order_relaxed) >= limit_) { + break; + } + } + if (step <= maxStep_) { + auto adjIter = adjList.find(dst); + if (adjIter == adjList.end()) { + continue; } - if (step <= steps_) { - auto adjIter = adjList.find(dst); - if (adjIter == adjList.end()) { + + auto& adjedges = adjIter->second; + for (auto& edge : adjedges) { + if (hasSameEdge(edgeList, edge.getEdge())) { continue; } + auto newEdgeList = edgeList; + newEdgeList.emplace_back(adjIter->first); + newEdgeList.emplace_back(edge); + newEdgeLists->emplace_back(std::move(newEdgeList)); + } + } + } - auto& adjedges = adjIter->second; - for (auto& edge : adjedges) { - if (hasSameEdge(edgeList, edge.getEdge())) { + auto newEdgeListsSize = newEdgeLists->size(); + if (step > maxStep_ || newEdgeListsSize == 0) { + return paths; + } + DLOG(ERROR) << " newEdgeLists.size() " << newEdgeLists->size(); + if (newEdgeListsSize < FLAGS_path_batch_size) { + futures.emplace_back(folly::via(runner(), [this, step, newEdgeListsSize, newEdgeLists]() { + doBuildPath(step + 1, 0, newEdgeListsSize, newEdgeLists); + })); + } else { + for (size_t _start = 0; _start < newEdgeListsSize; _start += FLAGS_path_batch_size) { + auto trueEnd = _start + FLAGS_path_batch_size; + auto _end = trueEnd > newEdgeListsSize ? newEdgeListsSize : trueEnd; + futures.emplace_back(folly::via(runner(), [this, step, _start, _end, newEdgeLists]() { + doBuildPath(step + 1, _start, _end, newEdgeLists); + })); + } + } + return folly::collect(futures).via(runner()).thenValue( + [&paths](std::vector>&& resp) { + DLOG(ERROR) << "collect resp 's size " << resp.size(); + // result_.rows.swap(paths); + for (auto& rows : resp) { + DLOG(ERROR) << "rows ' size " << rows.size(); + if (rows.empty()) { continue; } - auto newEdgeList = edgeList; - newEdgeList.emplace_back(adjIter->first); - newEdgeList.emplace_back(edge); - newEdgeLists.emplace_back(std::move(newEdgeList)); + result_.rows.insert(result_.rows.end(), + std::make_move_iterator(rows.begin()), + std::make_move_iterator(rows.end())); } - } - } - if (step > steps_) { - DLOG(ERROR) << " stop "; - pool.stop(); - } - if (cnt_.load(std::memory_order_relaxed) >= limit_) { - pool.stop(); - } else if (step <= steps_ && !newEdgeLists.empty()) { - DLOG(ERROR) << " newEdgeLists.size() " << newEdgeLists.size(); - futures.emplace_back(pool.addTask(task, step + 1, std::move(src), std::move(newEdgeLists))); - } - return result; - }; + DLOG(ERROR) << " result' s size " << result_.rows.size(); + DLOG(ERROR) << "reuslt is " << result_.toString(); + return result_; + }); +} +void AllPathsExecutor::buildPathMultiJobs() { + auto edgeLists = std::make_shared>>(); for (auto& vid : leftInitVids_) { auto vidIter = leftAdjList_.find(vid); if (vidIter == leftAdjList_.end()) { @@ -394,101 +451,98 @@ std::vector>> AllPathsExecutor::buildPathMult if (adjEdges.empty()) { continue; } - std::vector> edgeLists; - edgeLists.reserve(adjEdges.size()); + edgeLists->reserve(adjEdges.size() + edgeLists->size()); for (auto& edge : adjEdges) { - edgeLists.emplace_back(std::vector({edge})); + edgeLists->emplace_back(std::vector({src, edge})); } - size_t step = 2; - futures.emplace_back(pool.addTask(task, step, src, std::move(edgeLists))); } - pool.wait(); - return futures; + size_t step = 2; + doBuildPath(step, 0, edgeLists->size(), edgeLists); } -void AllPathsExecutor::buildPath() { - for (const auto& vid : leftInitVids_) { - auto paths = doBuildPath(vid); - if (paths.empty()) { - continue; - } - result_.rows.insert(result_.rows.end(), - std::make_move_iterator(paths.begin()), - std::make_move_iterator(paths.end())); - } -} +// void AllPathsExecutor::buildPath() { +// for (const auto& vid : leftInitVids_) { +// auto paths = doBuildPath(vid); +// if (paths.empty()) { +// continue; +// } +// result_.rows.insert(result_.rows.end(), +// std::make_move_iterator(paths.begin()), +// std::make_move_iterator(paths.end())); +// } +// } -std::vector AllPathsExecutor::doBuildPath(const Value& vid) { - auto& adjList = leftAdjList_; - auto vidIter = adjList.find(vid); - if (vidIter == adjList.end()) { - return std::vector(); - } - auto& src = vidIter->first; - auto& adjEdges = vidIter->second; - if (adjEdges.empty()) { - return std::vector(); - } +// std::vector AllPathsExecutor::doBuildPath(const Value& vid) { +// auto& adjList = leftAdjList_; +// auto vidIter = adjList.find(vid); +// if (vidIter == adjList.end()) { +// return std::vector(); +// } +// auto& src = vidIter->first; +// auto& adjEdges = vidIter->second; +// if (adjEdges.empty()) { +// return std::vector(); +// } - std::vector result; - result.reserve(adjEdges.size()); +// std::vector result; +// result.reserve(adjEdges.size()); - std::queue*> queue; - std::list>> holder; - for (auto& edge : adjEdges) { - auto ptr = std::make_unique>(std::vector({edge})); - queue.emplace(ptr.get()); - holder.emplace_back(std::move(ptr)); - } +// std::queue*> queue; +// std::list>> holder; +// for (auto& edge : adjEdges) { +// auto ptr = std::make_unique>(std::vector({edge})); +// queue.emplace(ptr.get()); +// holder.emplace_back(std::move(ptr)); +// } - size_t step = 1; - size_t adjSize = queue.size(); - while (!queue.empty()) { - auto edgeListPtr = queue.front(); - queue.pop(); - --adjSize; +// size_t step = 1; +// size_t adjSize = queue.size(); +// while (!queue.empty()) { +// auto edgeListPtr = queue.front(); +// queue.pop(); +// --adjSize; - auto& dst = edgeListPtr->back().getEdge().dst; - auto dstIter = rightInitVids_.find(dst); - if (dstIter != rightInitVids_.end()) { - Row row; - row.values.emplace_back(src); - row.values.emplace_back(List(*edgeListPtr)); - row.values.emplace_back(*dstIter); - result.emplace_back(std::move(row)); - } +// auto& dst = edgeListPtr->back().getEdge().dst; +// auto dstIter = rightInitVids_.find(dst); +// if (dstIter != rightInitVids_.end()) { +// Row row; +// row.values.emplace_back(src); +// row.values.emplace_back(List(*edgeListPtr)); +// row.values.emplace_back(*dstIter); +// result.emplace_back(std::move(row)); +// } - auto adjIter = adjList.find(dst); - if (adjIter == adjList.end()) { - if (adjSize == 0) { - if (++step > steps_) { - break; - } - adjSize = queue.size(); - } - continue; - } +// auto adjIter = adjList.find(dst); +// if (adjIter == adjList.end()) { +// if (adjSize == 0) { +// if (++step > maxStep_) { +// break; +// } +// adjSize = queue.size(); +// } +// continue; +// } - auto& adjedges = adjIter->second; - for (auto& edge : adjedges) { - if (hasSameEdge(*edgeListPtr, edge.getEdge())) { - continue; - } - auto newEdgeListPtr = std::make_unique>(*edgeListPtr); - newEdgeListPtr->emplace_back(adjIter->first); - newEdgeListPtr->emplace_back(edge); - queue.emplace(newEdgeListPtr.get()); - holder.emplace_back(std::move(newEdgeListPtr)); - } - if (adjSize == 0) { - if (++step > steps_) { - break; - } - adjSize = queue.size(); - } - } - return result; -} +// auto& adjedges = adjIter->second; +// for (auto& edge : adjedges) { +// if (hasSameEdge(*edgeListPtr, edge.getEdge())) { +// continue; +// } +// auto newEdgeListPtr = std::make_unique>(*edgeListPtr); +// newEdgeListPtr->emplace_back(adjIter->first); +// newEdgeListPtr->emplace_back(edge); +// queue.emplace(newEdgeListPtr.get()); +// holder.emplace_back(std::move(newEdgeListPtr)); +// } +// if (adjSize == 0) { +// if (++step > maxStep_) { +// break; +// } +// adjSize = queue.size(); +// } +// } +// return result; +// } } // namespace graph } // namespace nebula diff --git a/src/graph/executor/algo/AllPathsExecutor.h b/src/graph/executor/algo/AllPathsExecutor.h index f5eab8d1206..28747c17020 100644 --- a/src/graph/executor/algo/AllPathsExecutor.h +++ b/src/graph/executor/algo/AllPathsExecutor.h @@ -38,9 +38,14 @@ class AllPathsExecutor final : public PathBaseExecutor { std::vector doBuildPath(const Value& vid); - void buildPath(); + std::vector doBuildPath(size_t step, + size_t start, + size_t end, + std::shared_ptr>> edgeLists); - std::vector>> buildPathMultiJobs(); + // void buildPath(); + + void buildPathMultiJobs(); folly::Future buildResult(); @@ -50,7 +55,7 @@ class AllPathsExecutor final : public PathBaseExecutor { bool noLoop_{false}; size_t limit_{std::numeric_limits::max()}; std::atomic cnt_{0}; - size_t steps_{0}; + size_t maxStep_{0}; size_t leftSteps_{0}; size_t rightSteps_{0};