Skip to content

Commit

Permalink
Fix graph profiling data format
Browse files Browse the repository at this point in the history
  • Loading branch information
yixinglu committed Nov 17, 2022
1 parent 9a7b721 commit a044abf
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 95 deletions.
14 changes: 6 additions & 8 deletions src/graph/executor/StorageAccessExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,13 @@ StatusOr<std::vector<Value>> StorageAccessExecutor::buildRequestListByVidType(It
return internal::buildRequestList<std::string>(space, exprCtx, iter, expr, dedup, isCypher);
}

std::string StorageAccessExecutor::getStorageDetail(
optional_field_ref<const std::map<std::string, int32_t> &> ref) const {
if (ref.has_value()) {
auto content = util::join(*ref, [](auto &iter) -> std::string {
return folly::sformat("\n {}:{}(us)", iter.first, iter.second);
});
return "{" + content + "}";
folly::dynamic StorageAccessExecutor::getStorageDetail(
const std::map<std::string, int32_t> &profileDetail) const {
folly::dynamic profileData = folly::dynamic::object();
for (auto &p : profileDetail) {
profileData.insert(p.first, folly::sformat("{}(us)", p.second));
}
return "";
return profileData;
}

} // namespace graph
Expand Down
35 changes: 25 additions & 10 deletions src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,37 @@ class StorageAccessExecutor : public Executor {
return Status::OK();
}

folly::dynamic collectRespProfileData(const storage::cpp2::ResponseCommon &resp,
const std::tuple<HostAddr, int32_t, int32_t> &info,
size_t numVertices = 0UL,
size_t totalRpcTime = 0UL) const {
folly::dynamic stat = folly::dynamic::object();
stat.insert("address", std::get<0>(info).toString());
stat.insert("exec", folly::sformat("{}(us)", std::get<1>(info)));
stat.insert("total", folly::sformat("{}(us)", std::get<2>(info)));
if (numVertices > 0) {
stat.insert("vertices", numVertices);
}
if (totalRpcTime > 0) {
stat.insert("total_rpc_time", folly::sformat("{}(us)", totalRpcTime));
}
if (resp.latency_detail_us_ref().has_value()) {
stat.insert("storage_detail", getStorageDetail(*resp.get_latency_detail_us()));
}
return stat;
}

template <typename RESP>
void addStats(RESP &resp, std::unordered_map<std::string, std::string> &stats) const {
void addStats(storage::StorageRpcResponse<RESP> &resp,
std::unordered_map<std::string, std::string> &stats) const {
auto &hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
auto &info = hostLatency[i];
stats.emplace(folly::sformat("{} exec/total", std::get<0>(info).toString()),
folly::sformat("{}(us)/{}(us)", std::get<1>(info), std::get<2>(info)));
auto detail = getStorageDetail(resp.responses()[i].result_ref()->latency_detail_us_ref());
if (!detail.empty()) {
stats.emplace("storage_detail", detail);
}
auto info = collectRespProfileData(resp.responses()[i].get_result(), hostLatency[i]);
stats.emplace(folly::sformat("resp[{}]", i), folly::toPrettyJson(info));
}
}

std::string getStorageDetail(
apache::thrift::optional_field_ref<const std::map<std::string, int32_t> &> ref) const;
folly::dynamic getStorageDetail(const std::map<std::string, int32_t> &profileDetail) const;

bool isIntVidType(const SpaceInfo &space) const;

Expand Down
71 changes: 33 additions & 38 deletions src/graph/executor/algo/ShortestPathBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,13 @@ std::vector<Value> ShortestPathBase::handlePropResp(PropRpcResponse&& resps) {
return vertices;
}

std::string ShortestPathBase::getStorageDetail(
optional_field_ref<const std::map<std::string, int32_t>&> ref) const {
if (ref.has_value()) {
auto content = util::join(*ref, [](auto& iter) -> std::string {
return folly::sformat("{}:{}(us)", iter.first, iter.second);
});
return "{" + content + "}";
folly::dynamic ShortestPathBase::getStorageDetail(
const std::map<std::string, int32_t>& profileDetail) const {
folly::dynamic info = folly::dynamic::object();
for (auto& p : profileDetail) {
info.insert(p.first, folly::sformat("{}(us)", p.second));
}
return "";
return info;
}

Status ShortestPathBase::handleErrorCode(nebula::cpp2::ErrorCode code, PartitionID partId) const {
Expand Down Expand Up @@ -171,55 +169,52 @@ void ShortestPathBase::addStats(RpcResponse& resp,
size_t stepNum,
int64_t timeInUSec,
bool reverse) const {
folly::dynamic stats = folly::dynamic::array();
auto& hostLatency = resp.hostLatency();
std::stringstream ss;
ss << "{\n";
for (size_t i = 0; i < hostLatency.size(); ++i) {
size_t size = 0u;
auto& result = resp.responses()[i];
if (result.vertices_ref().has_value()) {
size = (*result.vertices_ref()).size();
}
auto& info = hostLatency[i];
ss << "{" << folly::sformat("{} exec/total/vertices: ", std::get<0>(info).toString())
<< folly::sformat("{}(us)/{}(us)/{},", std::get<1>(info), std::get<2>(info), size) << "\n"
<< folly::sformat("total_rpc_time: {}(us)", timeInUSec) << "\n";
auto detail = getStorageDetail(result.result.latency_detail_us_ref());
if (!detail.empty()) {
ss << folly::sformat("storage_detail: {}", detail);
folly::dynamic stat = folly::dynamic::object();
stat.insert("address", std::get<0>(info).toString());
stat.insert("exec", folly::sformat("{}(us)", std::get<1>(info)));
stat.insert("total", folly::sformat("{}(us)", std::get<2>(info)));
stat.insert("vertices", size);
stat.insert("total_rpc_time", folly::sformat("{}(us)", timeInUSec));
if (result.result.latency_detail_us_ref().has_value()) {
stat.insert("storage_detail", getStorageDetail(*result.result.get_latency_detail_us()));
}
ss << "\n}";
}
ss << "\n}";
if (reverse) {
statsLock_.lock();
stats_->emplace(folly::sformat("reverse step {}", stepNum), ss.str());
statsLock_.unlock();
} else {
statsLock_.lock();
stats_->emplace(folly::sformat("step {}", stepNum), ss.str());
statsLock_.unlock();
stats.push_back(folly::dynamic::object(folly::sformat("resp[{}]", i), stat));
}

auto key = folly::sformat("{}step[{}]", reverse ? "reverse " : "", stepNum);
statsLock_.lock();
stats_->emplace(key, folly::toPrettyJson(stats));
statsLock_.unlock();
}

void ShortestPathBase::addStats(PropRpcResponse& resp, int64_t timeInUSec) const {
folly::dynamic stats = folly::dynamic::array();
auto& hostLatency = resp.hostLatency();
std::stringstream ss;
ss << "{\n";
for (size_t i = 0; i < hostLatency.size(); ++i) {
auto& info = hostLatency[i];
ss << "{" << folly::sformat("{} exec/total: ", std::get<0>(info).toString())
<< folly::sformat("{}(us)/{}(us),", std::get<1>(info), std::get<2>(info)) << "\n"
<< folly::sformat("total_rpc_time: {}(us)", timeInUSec) << "\n";
auto detail = getStorageDetail(resp.responses()[i].result_ref()->latency_detail_us_ref());
if (!detail.empty()) {
ss << folly::sformat("storage_detail: {}", detail);
folly::dynamic stat = folly::dynamic::object();
stat.insert("address", std::get<0>(info).toString());
stat.insert("exec", folly::sformat("{}(us)", std::get<1>(info)));
stat.insert("total", folly::sformat("{}(us)", std::get<2>(info)));
stat.insert("total_rpc_time", folly::sformat("{}(us)", timeInUSec));
const auto& result = resp.responses()[i].get_result();
if (result.latency_detail_us_ref().has_value()) {
stat.insert("storage_detail", getStorageDetail(*result.get_latency_detail_us()));
}
ss << "\n}";
stats.push_back(std::move(stat));
}
ss << "\n}";

statsLock_.lock();
stats_->emplace(folly::sformat("get_prop "), ss.str());
stats_->emplace("get_prop", folly::toPrettyJson(stats));
statsLock_.unlock();
}

Expand Down
3 changes: 1 addition & 2 deletions src/graph/executor/algo/ShortestPathBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ class ShortestPathBase {
return Result::State::kSuccess;
}

std::string getStorageDetail(
apache::thrift::optional_field_ref<const std::map<std::string, int32_t>&> ref) const;
folly::dynamic getStorageDetail(const std::map<std::string, int32_t>& profileDetail) const;

protected:
const ShortestPath* pathNode_{nullptr};
Expand Down
10 changes: 2 additions & 8 deletions src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,8 @@ folly::Future<Status> SubgraphExecutor::getNeighbors() {
if (result.vertices_ref().has_value()) {
size = (*result.vertices_ref()).size();
}
auto& info = hostLatency[i];
otherStats_.emplace(
folly::sformat("{} exec/total/vertices", std::get<0>(info).toString()),
folly::sformat("{}(us)/{}(us)/{},", std::get<1>(info), std::get<2>(info), size));
auto detail = getStorageDetail(result.result.latency_detail_us_ref());
if (!detail.empty()) {
otherStats_.emplace("storage_detail", detail);
}
auto info = collectRespProfileData(result.result, hostLatency[i], size);
otherStats_.emplace(folly::sformat("resp[{}]", i), folly::toPrettyJson(info));
}
vids_.clear();
return handleResponse(std::move(resp));
Expand Down
10 changes: 2 additions & 8 deletions src/graph/executor/query/GetDstBySrcExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,8 @@ folly::Future<Status> GetDstBySrcExecutor::execute() {
if (result.dsts_ref().has_value()) {
size = (*result.dsts_ref()).size();
}
auto& info = hostLatency[i];
otherStats_.emplace(
folly::sformat("{} exec/total/vertices", std::get<0>(info).toString()),
folly::sformat("{}(us)/{}(us)/{},", std::get<1>(info), std::get<2>(info), size));
auto detail = getStorageDetail(result.result.latency_detail_us_ref());
if (!detail.empty()) {
otherStats_.emplace("storage_detail", detail);
}
auto info = collectRespProfileData(result.result, hostLatency[i], size);
otherStats_.emplace(folly::sformat("resp[{}]", i), folly::toPrettyJson(info));
}
return handleResponse(resp, this->gd_->colNames());
});
Expand Down
10 changes: 2 additions & 8 deletions src/graph/executor/query/GetNeighborsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,8 @@ folly::Future<Status> GetNeighborsExecutor::execute() {
if (result.vertices_ref().has_value()) {
size = (*result.vertices_ref()).size();
}
auto& info = hostLatency[i];
otherStats_.emplace(
folly::sformat("{} exec/total/vertices", std::get<0>(info).toString()),
folly::sformat("{}(us)/{}(us)/{},", std::get<1>(info), std::get<2>(info), size));
auto detail = getStorageDetail(result.result.latency_detail_us_ref());
if (!detail.empty()) {
otherStats_.emplace("storage_detail", detail);
}
auto info = collectRespProfileData(result.result, hostLatency[i], size);
otherStats_.emplace(folly::sformat("resp[{}]", i), folly::toPrettyJson(info));
}
return handleResponse(resp);
});
Expand Down
17 changes: 4 additions & 13 deletions src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,27 +120,18 @@ Expression* TraverseExecutor::selectFilter() {
}

void TraverseExecutor::addStats(RpcResponse& resp, int64_t getNbrTimeInUSec) {
folly::dynamic stepInfo = folly::dynamic::array();
auto& hostLatency = resp.hostLatency();
std::stringstream ss;
ss << "{\n";
for (size_t i = 0; i < hostLatency.size(); ++i) {
size_t size = 0u;
auto& result = resp.responses()[i];
if (result.vertices_ref().has_value()) {
size = (*result.vertices_ref()).size();
}
auto& info = hostLatency[i];
ss << "{" << folly::sformat("{} exec/total/vertices: ", std::get<0>(info).toString())
<< folly::sformat("{}(us)/{}(us)/{},", std::get<1>(info), std::get<2>(info), size) << "\n"
<< folly::sformat("total_rpc_time: {}(us)", getNbrTimeInUSec) << "\n";
auto detail = getStorageDetail(result.result.latency_detail_us_ref());
if (!detail.empty()) {
ss << folly::sformat("storage_detail: {}", detail);
}
ss << "\n}";
auto info = collectRespProfileData(result.result, hostLatency[i], size, getNbrTimeInUSec);
stepInfo.push_back(folly::dynamic::object(folly::sformat("resp[{}]", i), info));
}
ss << "\n}";
otherStats_.emplace(folly::sformat("step {}", currentStep_), ss.str());
otherStats_.emplace(folly::sformat("step[{}]", currentStep_), folly::toPrettyJson(stepInfo));
}

folly::Future<Status> TraverseExecutor::handleResponse(RpcResponse&& resps) {
Expand Down

0 comments on commit a044abf

Please sign in to comment.