Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the lock from collectResponse in storage client #5338

Merged
merged 2 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,44 +106,44 @@ StorageClientBase<ClientType, ClientManagerType>::collectResponse(
memory::MemoryCheckGuard guard;
StorageRpcResponse<Response> rpcResp(resps.size());
for (size_t i = 0; i < resps.size(); i++) {
auto& host = hosts->at(i);
const auto& host = hosts->at(i);
folly::Try<StatusOr<Response>>& tryResp = resps[i];
if (tryResp.hasException()) {
std::string errMsg = tryResp.exception().what().toStdString();
rpcResp.markFailure();
rpcResp.markFailureUnsafe();
LOG(ERROR) << "There some RPC errors: " << errMsg;
auto req = requests.at(host);
auto parts = getReqPartsId(req);
rpcResp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE);
const auto& req = requests.at(host);
const auto& parts = getReqPartsId(req);
rpcResp.appendFailedPartsUnsafe(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE);
} else {
StatusOr<Response> status = std::move(tryResp).value();
if (status.ok()) {
auto resp = std::move(status).value();
auto result = resp.get_result();
const auto& result = resp.get_result();

if (!result.get_failed_parts().empty()) {
rpcResp.markFailure();
rpcResp.markFailureUnsafe();
for (auto& part : result.get_failed_parts()) {
rpcResp.emplaceFailedPart(part.get_part_id(), part.get_code());
rpcResp.emplaceFailedPartUnsafe(part.get_part_id(), part.get_code());
}
}

// Adjust the latency
auto latency = result.get_latency_in_us();
rpcResp.setLatency(host, latency, totalLatencies->at(i));
rpcResp.setLatencyUnsafe(host, latency, totalLatencies->at(i));
// Keep the response
rpcResp.addResponse(std::move(resp));
rpcResp.addResponseUnsafe(std::move(resp));
} else {
rpcResp.markFailure();
rpcResp.markFailureUnsafe();
Status s = std::move(status).status();
nebula::cpp2::ErrorCode errorCode =
s.code() == Status::Code::kGraphMemoryExceeded
? nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED
: nebula::cpp2::ErrorCode::E_RPC_FAILURE;
LOG(ERROR) << "There some RPC errors: " << s.message();
auto req = requests.at(host);
auto parts = getReqPartsId(req);
rpcResp.appendFailedParts(parts, errorCode);
const auto& req = requests.at(host);
const auto& parts = getReqPartsId(req);
rpcResp.appendFailedPartsUnsafe(parts, errorCode);
}
}
}
Expand Down
38 changes: 30 additions & 8 deletions src/clients/storage/StorageClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class StorageRpcResponse final {

explicit StorageRpcResponse(size_t reqsSent) : totalReqsSent_(reqsSent) {
lock_ = std::make_unique<std::mutex>();
responses_.reserve(reqsSent);
}

bool succeeded() const {
Expand All @@ -47,46 +48,67 @@ class StorageRpcResponse final {
return maxLatency_;
}

void setLatency(HostAddr host, int32_t latency, int32_t e2eLatency) {
std::lock_guard<std::mutex> g(*lock_);
void setLatencyUnsafe(const HostAddr& host, int32_t latency, int32_t e2eLatency) {
if (latency > maxLatency_) {
maxLatency_ = latency;
}
hostLatency_.emplace_back(std::make_tuple(host, latency, e2eLatency));
}

void markFailure() {
void setLatency(const HostAddr& host, int32_t latency, int32_t e2eLatency) {
std::lock_guard<std::mutex> g(*lock_);
setLatencyUnsafe(host, latency, e2eLatency);
}

void markFailureUnsafe() {
result_ = Result::PARTIAL_SUCCEEDED;
++failedReqs_;
}

void markFailure() {
std::lock_guard<std::mutex> g(*lock_);
markFailureUnsafe();
}

// A value between [0, 100], representing a percentage
int32_t completeness() const {
std::lock_guard<std::mutex> g(*lock_);
DCHECK_NE(totalReqsSent_, 0);
return totalReqsSent_ == 0 ? 0 : (totalReqsSent_ - failedReqs_) * 100 / totalReqsSent_;
}

void emplaceFailedPart(PartitionID partId, nebula::cpp2::ErrorCode errorCode) {
std::lock_guard<std::mutex> g(*lock_);
void emplaceFailedPartUnsafe(PartitionID partId, nebula::cpp2::ErrorCode errorCode) {
failedParts_.emplace(partId, errorCode);
}

void appendFailedParts(const std::vector<PartitionID>& partsId,
nebula::cpp2::ErrorCode errorCode) {
void emplaceFailedPart(PartitionID partId, nebula::cpp2::ErrorCode errorCode) {
std::lock_guard<std::mutex> g(*lock_);
emplaceFailedPartUnsafe(partId, errorCode);
}

void appendFailedPartsUnsafe(const std::vector<PartitionID>& partsId,
nebula::cpp2::ErrorCode errorCode) {
failedParts_.reserve(failedParts_.size() + partsId.size());
for (const auto& partId : partsId) {
failedParts_.emplace(partId, errorCode);
}
}

void addResponse(Response&& resp) {
void appendFailedParts(const std::vector<PartitionID>& partsId,
nebula::cpp2::ErrorCode errorCode) {
std::lock_guard<std::mutex> g(*lock_);
appendFailedPartsUnsafe(partsId, errorCode);
}

void addResponseUnsafe(Response&& resp) {
responses_.emplace_back(std::move(resp));
}

void addResponse(Response&& resp) {
std::lock_guard<std::mutex> g(*lock_);
addResponseUnsafe(std::move(resp));
}

// Not thread-safe.
const std::unordered_map<PartitionID, nebula::cpp2::ErrorCode>& failedParts() const {
return failedParts_;
Expand Down
21 changes: 8 additions & 13 deletions src/graph/context/iterator/PropIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,13 @@ const Value& PropIter::getProp(const std::string& name, const std::string& prop)
if (name == "*") {
for (auto& index : propsMap) {
auto propIndex = index.second.find(prop);
if (propIndex == index.second.end()) {
continue;
}
colId = propIndex->second;
DCHECK_GT(row.size(), colId);
auto& val = row[colId];
if (val.empty()) {
continue;
} else {
return val;
if (propIndex != index.second.end()) {
colId = propIndex->second;
DCHECK_GT(row.size(), colId);
auto& val = row[colId];
if (!val.empty()) {
return val;
}
}
}
return Value::kNullValue;
Expand Down Expand Up @@ -153,9 +150,7 @@ Value PropIter::getVertex(const std::string& name) {
Tag tag;
tag.name = tagProp.first;
for (auto& propIndex : tagProp.second) {
if (propIndex.first == nebula::kTag) { // "_tag"
continue;
} else {
if (propIndex.first != nebula::kTag) { // "_tag"
tag.props.emplace(propIndex.first, row[propIndex.second]);
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/graph/executor/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ class Executor : private boost::noncopyable, private cpp::NonMovable {
class GatherFunc>
auto runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator *iter);

void addState(const std::string &name, size_t durationInUs) {
otherStats_.emplace(name, folly::sformat("{}(us)", durationInUs));
}
void addState(const std::string &name, folly::dynamic json) {
otherStats_.emplace(name, folly::toPrettyJson(json));
}

int64_t id_;

// Executor name
Expand Down
5 changes: 2 additions & 3 deletions src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,11 @@ class StorageAccessExecutor : public Executor {
}

template <typename RESP>
void addStats(storage::StorageRpcResponse<RESP> &resp,
std::unordered_map<std::string, std::string> &stats) const {
void addStats(storage::StorageRpcResponse<RESP> &resp) {
auto &hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
auto info = util::collectRespProfileData(resp.responses()[i].get_result(), hostLatency[i]);
stats.emplace(folly::sformat("resp[{}]", i), folly::toPrettyJson(info));
addState(folly::sformat("resp[{}]", i), std::move(info));
}
}

Expand Down
44 changes: 19 additions & 25 deletions src/graph/executor/query/AppendVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::GetPropResponse;

DECLARE_bool(optimize_appendvertices);

namespace nebula {
namespace graph {

folly::Future<Status> AppendVerticesExecutor::execute() {
return appendVertices();
}
Expand Down Expand Up @@ -56,15 +59,12 @@ folly::Future<Status> AppendVerticesExecutor::appendVertices() {
av->limit(qctx()),
av->filter())
.via(runner())
.ensure([this, getPropsTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc", folly::sformat("{}(us)", getPropsTime.elapsedInUSec()));
})
.thenValue([this](StorageRpcResponse<GetPropResponse> &&rpcResp) {
.thenValue([this, getPropsTime](StorageRpcResponse<GetPropResponse> &&rpcResp) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
addState("total_rpc", getPropsTime.elapsedInUSec());
addStats(rpcResp);
if (FLAGS_max_job_size <= 1) {
return folly::makeFuture<Status>(handleResp(std::move(rpcResp)));
} else {
Expand Down Expand Up @@ -150,12 +150,11 @@ Status AppendVerticesExecutor::handleResp(
bool mv = movable(av->inputVars().front());
for (; inputIter->valid(); inputIter->next()) {
auto dstFound = map.find(src->eval(ctx(inputIter.get())));
if (dstFound == map.end()) {
continue;
if (dstFound != map.end()) {
Row row = mv ? inputIter->moveRow() : *inputIter->row();
row.values.emplace_back(dstFound->second);
ds.rows.emplace_back(std::move(row));
}
Row row = mv ? inputIter->moveRow() : *inputIter->row();
row.values.emplace_back(dstFound->second);
ds.rows.emplace_back(std::move(row));
}
return finish(ResultBuilder().value(Value(std::move(ds))).state(state).build());
}
Expand All @@ -175,9 +174,7 @@ folly::Future<Status> AppendVerticesExecutor::handleRespMultiJobs(
if (resp.props_ref().has_value()) {
auto &&respV = std::move(*resp.props_ref());
v.colNames = respV.colNames;
v.rows.insert(v.rows.end(),
std::make_move_iterator(respV.begin()),
std::make_move_iterator(respV.end()));
std::move(respV.begin(), respV.end(), std::back_inserter(v.rows));
}
}
auto propIter = PropIter(std::make_shared<Value>(std::move(v)));
Expand All @@ -191,10 +188,9 @@ folly::Future<Status> AppendVerticesExecutor::handleRespMultiJobs(
auto gather = [this](auto &&results) -> Status {
memory::MemoryCheckGuard guard;
for (auto &r : results) {
NG_RETURN_IF_ERROR(r);
auto &&rows = std::move(r).value();
result_.rows.insert(result_.rows.end(),
std::make_move_iterator(rows.begin()),
std::make_move_iterator(rows.end()));
std::move(rows.begin(), rows.end(), std::back_inserter(result_.rows));
}
return finish(ResultBuilder().value(Value(std::move(result_))).build());
};
Expand All @@ -219,10 +215,9 @@ folly::Future<Status> AppendVerticesExecutor::handleRespMultiJobs(
auto gatherFinal = [this](auto &&results) -> Status {
memory::MemoryCheckGuard guard2;
for (auto &r : results) {
NG_RETURN_IF_ERROR(r);
auto &&rows = std::move(r).value();
result_.rows.insert(result_.rows.end(),
std::make_move_iterator(rows.begin()),
std::make_move_iterator(rows.end()));
std::move(rows.begin(), rows.end(), std::back_inserter(result_.rows));
}
return finish(ResultBuilder().value(Value(std::move(result_))).build());
};
Expand Down Expand Up @@ -280,12 +275,11 @@ DataSet AppendVerticesExecutor::handleJob(size_t begin, size_t end, Iterator *it
QueryExpressionContext ctx(qctx()->ectx());
for (; iter->valid() && begin++ < end; iter->next()) {
auto dstFound = dsts_.find(src->eval(ctx(iter)));
if (dstFound == dsts_.end()) {
continue;
if (dstFound != dsts_.end()) {
Row row = *iter->row();
row.values.emplace_back(dstFound->second);
ds.rows.emplace_back(std::move(row));
}
Row row = *iter->row();
row.values.emplace_back(dstFound->second);
ds.rows.emplace_back(std::move(row));
}

return ds;
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/GetEdgesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ folly::Future<Status> GetEdgesExecutor::getEdges() {
.thenValue([this, ge](StorageRpcResponse<GetPropResponse> &&rpcResp) {
memory::MemoryCheckGuard guard;
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
addStats(rpcResp);
return handleResp(std::move(rpcResp), ge->colNames());
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/GetVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ folly::Future<Status> GetVerticesExecutor::getVertices() {
.thenValue([this, gv](StorageRpcResponse<GetPropResponse> &&rpcResp) {
memory::MemoryCheckGuard guard;
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
addStats(rpcResp);
return handleResp(std::move(rpcResp), gv->colNames());
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/IndexScanExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ folly::Future<Status> IndexScanExecutor::indexScan() {
.thenValue([this](StorageRpcResponse<LookupIndexResp> &&rpcResp) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
addStats(rpcResp, otherStats_);
addStats(rpcResp);
return handleResp(std::move(rpcResp));
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/ScanEdgesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ folly::Future<Status> ScanEdgesExecutor::scanEdges() {
.thenValue([this](StorageRpcResponse<ScanResponse> &&rpcResp) {
memory::MemoryCheckGuard guard;
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
addStats(rpcResp);
return handleResp(std::move(rpcResp), {});
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/ScanVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ folly::Future<Status> ScanVerticesExecutor::scanVertices() {
.thenValue([this, sv](StorageRpcResponse<ScanResponse> &&rpcResp) {
memory::MemoryCheckGuard guard;
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
addStats(rpcResp);
return handleResp(std::move(rpcResp), sv->colNames());
});
}
Expand Down
9 changes: 4 additions & 5 deletions src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ folly::Future<Status> TraverseExecutor::getNeighbors() {
addStats(resp, getNbrTime.elapsedInUSec());
time::Duration expandTime;
return handleResponse(std::move(resp)).ensure([this, expandTime]() {
otherStats_.emplace("expandTime", folly::sformat("{}(us)", expandTime.elapsedInUSec()));
addState("expandTime", expandTime.elapsedInUSec());
});
})
.thenValue([this](Status s) -> folly::Future<Status> {
Expand Down Expand Up @@ -149,7 +149,7 @@ void TraverseExecutor::addStats(RpcResponse& resp, int64_t getNbrTimeInUSec) {
folly::dynamic stepObj = folly::dynamic::object();
stepObj.insert("storage", stepInfo);
stepObj.insert("total_rpc_time", folly::sformat("{}(us)", getNbrTimeInUSec));
otherStats_.emplace(folly::sformat("step[{}]", currentStep_), folly::toPrettyJson(stepObj));
addState(folly::sformat("step[{}]", currentStep_), std::move(stepObj));
}

size_t TraverseExecutor::numRowsOfRpcResp(const RpcResponse& resps) const {
Expand Down Expand Up @@ -240,13 +240,12 @@ folly::Future<Status> TraverseExecutor::asyncExpandOneStep(RpcResponse&& resps)
}
}

auto t = postTaskTime.elapsedInUSec();
otherStats_.emplace("expandPostTaskTime", folly::sformat("{}(us)", t));
addState("expandPostTaskTime", postTaskTime.elapsedInUSec());
folly::dynamic taskRunTimeArray = folly::dynamic::array();
for (auto time : *taskRunTime) {
taskRunTimeArray.push_back(time);
}
otherStats_.emplace("expandTaskRunTime", folly::toPrettyJson(taskRunTimeArray));
addState("expandTaskRunTime", std::move(taskRunTimeArray));

return Status::OK();
});
Expand Down