Skip to content

Commit

Permalink
change finish()
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyu85cn committed Jan 17, 2022
1 parent 014e04e commit 5d81305
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 39 deletions.
4 changes: 2 additions & 2 deletions src/storage/transaction/ChainAddEdgesLocalProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ void ChainAddEdgesLocalProcessor::doRpc(folly::Promise<Code>&& promise,
return;
}
auto* iClient = env_->interClient_;
auto* evb = env_->txnMan_->getEventBase();
// auto* evb = env_->txnMan_->getEventBase();
folly::Promise<Code> p;
auto f = p.getFuture();
iClient->chainAddEdges(req, term_, edgeVer_, std::move(p), evb);
iClient->chainAddEdges(req, term_, edgeVer_, std::move(p));

std::move(f).thenTry([=, p = std::move(promise)](auto&& t) mutable {
auto code = t.hasValue() ? t.value() : Code::E_RPC_FAILURE;
Expand Down
11 changes: 11 additions & 0 deletions src/storage/transaction/ChainAddEdgesLocalProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ class ChainAddEdgesLocalProcessor : public BaseProcessor<cpp2::ExecResponse>,
* */
void replaceNullWithDefaultValue(cpp2::AddEdgesRequest& req);

/**
* @brief check is an error code belongs to kv store
* we can do retry / recover if we meet a kv store error
* but if we meet a logical error (retry will alwasy failed)
* we should return error directly.
* @param code
* @return true
* @return false
*/
bool isKVStoreError(Code code);

std::string makeReadableEdge(const cpp2::AddEdgesRequest& req);

int64_t toInt(const ::nebula::Value& val);
Expand Down
4 changes: 2 additions & 2 deletions src/storage/transaction/ChainDeleteEdgesLocalProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,10 @@ void ChainDeleteEdgesLocalProcessor::doRpc(folly::Promise<Code>&& promise,
return;
}
auto* iClient = env_->interClient_;
auto* evb = env_->txnMan_->getEventBase();
// auto* evb = env_->txnMan_->getEventBase();
folly::Promise<Code> p;
auto f = p.getFuture();
iClient->chainDeleteEdges(req, txnId_, term_, std::move(p), evb);
iClient->chainDeleteEdges(req, txnId_, term_, std::move(p));

std::move(f).thenTry([=, p = std::move(promise)](auto&& t) mutable {
auto code = t.hasValue() ? t.value() : Code::E_RPC_FAILURE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ folly::SemiFuture<Code> ChainResumeUpdateDoublePrimeProcessor::processRemote(Cod

folly::SemiFuture<Code> ChainResumeUpdateDoublePrimeProcessor::processLocal(Code code) {
VLOG(1) << " processRemote(), code = " << apache::thrift::util::enumNameSafe(code);
rcRemote_ = code;
setErrorCode(code);

auto currTerm = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_);
Expand All @@ -53,7 +54,25 @@ folly::SemiFuture<Code> ChainResumeUpdateDoublePrimeProcessor::processLocal(Code
}

void ChainResumeUpdateDoublePrimeProcessor::finish() {
VLOG(1) << " commitLocal()=" << apache::thrift::util::enumNameSafe(code_);
VLOG(1) << " commitLocal()=" << apache::thrift::util::enumNameSafe(rcCommit_);
TermID currTerm = 0;
std::tie(currTerm, std::ignore) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_);
if (term_ == currTerm) {
if (rcCommit_ == Code::SUCCEEDED && rcRemote_ == Code::SUCCEEDED) {
lk_->setAutoUnlock(true);
} else if (isKVStoreError(rcCommit_) || rcRemote_ == Code::E_RPC_FAILURE) {
reportFailed(ResumeType::RESUME_REMOTE);
} else {
// 1. rcCommit_ has some logical error (retry can't get any help)
// 2. rcRemote_ has some error other than RPC_FAILURE
// 2.1 we should do abort if this.
// 2.2 abort() should only have kv store error
lk_->setAutoUnlock(true);
}
} else {
// if term changed, transaction manager will do the clean.
}
pushResultCode(code_, req_.get_part_id());
finished_.setValue(code_);
onFinished();
}
Expand Down
6 changes: 0 additions & 6 deletions src/storage/transaction/ChainResumeUpdatePrimeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,5 @@ folly::SemiFuture<Code> ChainResumeUpdatePrimeProcessor::processLocal(Code code)
return code;
}

void ChainResumeUpdatePrimeProcessor::finish() {
VLOG(1) << "commitLocal()=" << apache::thrift::util::enumNameSafe(code_);
finished_.setValue(code_);
onFinished();
}

} // namespace storage
} // namespace nebula
2 changes: 0 additions & 2 deletions src/storage/transaction/ChainResumeUpdatePrimeProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ class ChainResumeUpdatePrimeProcessor : public ChainUpdateEdgeLocalProcessor {

folly::SemiFuture<nebula::cpp2::ErrorCode> processLocal(nebula::cpp2::ErrorCode code) override;

void finish() override;

virtual ~ChainResumeUpdatePrimeProcessor() = default;

protected:
Expand Down
71 changes: 46 additions & 25 deletions src/storage/transaction/ChainUpdateEdgeLocalProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,9 @@ folly::SemiFuture<Code> ChainUpdateEdgeLocalProcessor::processRemote(Code code)
return std::move(fut);
}

folly::SemiFuture<Code> ChainUpdateEdgeLocalProcessor::processLocal(Code rcRemote) {
VLOG(1) << " processRemote(): " << apache::thrift::util::enumNameSafe(rcRemote);
if (rcRemote != Code::SUCCEEDED && code_ == Code::SUCCEEDED) {
code_ = rcRemote;
}
folly::SemiFuture<Code> ChainUpdateEdgeLocalProcessor::processLocal(Code code) {
VLOG(1) << " processRemote(): " << apache::thrift::util::enumNameSafe(code);
rcRemote_ = code;

auto currTerm = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_);
if (currTerm.first != term_) {
Expand All @@ -94,9 +92,9 @@ folly::SemiFuture<Code> ChainUpdateEdgeLocalProcessor::processLocal(Code rcRemot
return code_;
}

if (rcRemote == Code::SUCCEEDED) {
if (code == Code::SUCCEEDED) {
erasePrime();
} else if (rcRemote == Code::E_RPC_FAILURE) {
} else if (code == Code::E_RPC_FAILURE) {
erasePrime();
appendDoublePrime();
} else {
Expand All @@ -106,19 +104,45 @@ folly::SemiFuture<Code> ChainUpdateEdgeLocalProcessor::processLocal(Code rcRemot
}
}

code_ = commit();
rcCommit_ = commit();

if (code_ != Code::SUCCEEDED) {
addUnfinishedEdge(ResumeType::RESUME_CHAIN);
reportFailed(ResumeType::RESUME_CHAIN);
} else {
if (rcRemote == Code::E_RPC_FAILURE) {
addUnfinishedEdge(ResumeType::RESUME_REMOTE);
if (code == Code::E_RPC_FAILURE) {
reportFailed(ResumeType::RESUME_REMOTE);
}
}

return code_;
}

void ChainUpdateEdgeLocalProcessor::finish() {
VLOG(1) << " commitLocal()=" << apache::thrift::util::enumNameSafe(rcCommit_);
TermID currTerm = 0;
std::tie(currTerm, std::ignore) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_);
if (term_ == currTerm) {
if (rcCommit_ == Code::SUCCEEDED && rcRemote_ == Code::SUCCEEDED) {
lk_->setAutoUnlock(true);
} else if (isKVStoreError(rcCommit_)) {
reportFailed(ResumeType::RESUME_CHAIN);
} else if (rcRemote_ == Code::E_RPC_FAILURE) {
reportFailed(ResumeType::RESUME_REMOTE);
} else {
// 1. rcCommit_ has some logical error (retry can't get any help)
// 2. rcRemote_ has some error other than RPC_FAILURE
// 2.1 we should do abort if this.
// 2.2 abort() should only have kv store error
lk_->setAutoUnlock(true);
}
} else {
// if term changed, transaction manager will do the clean.
}
pushResultCode(code_, req_.get_part_id());
finished_.setValue(code_);
onFinished();
}

void ChainUpdateEdgeLocalProcessor::doRpc(folly::Promise<Code>&& promise, int retry) noexcept {
try {
if (retry > retryLimit_) {
Expand All @@ -129,9 +153,7 @@ void ChainUpdateEdgeLocalProcessor::doRpc(folly::Promise<Code>&& promise, int re
folly::Promise<Code> p;
auto reversedReq = reverseRequest(req_);

// auto* evb = env_->txnMan_->getEventBase();
auto f = p.getFuture();
// iClient->chainUpdateEdge(reversedReq, term_, ver_, std::move(p), evb);
iClient->chainUpdateEdge(reversedReq, term_, ver_, std::move(p));
std::move(f)
.thenTry([=, p = std::move(promise)](auto&& t) mutable {
Expand Down Expand Up @@ -189,23 +211,15 @@ std::string ChainUpdateEdgeLocalProcessor::sEdgeKey(const cpp2::UpdateEdgeReques
return ConsistUtil::edgeKey(spaceVidLen_, req.get_part_id(), req.get_edge_key());
}

void ChainUpdateEdgeLocalProcessor::finish() {
VLOG(1) << " commitLocal()=" << apache::thrift::util::enumNameSafe(code_);
pushResultCode(code_, req_.get_part_id());
onFinished();
}

void ChainUpdateEdgeLocalProcessor::abort() {
auto key = ConsistUtil::primeKey(spaceVidLen_, localPartId_, req_.get_edge_key());
kvErased_.emplace_back(std::move(key));

folly::Baton<true, std::atomic> baton;
env_->kvstore_->asyncMultiRemove(
req_.get_space_id(), req_.get_part_id(), std::move(kvErased_), [&](auto rc) mutable {
LOG(INFO) << " abort()=" << apache::thrift::util::enumNameSafe(rc);
if (rc != Code::SUCCEEDED) {
addUnfinishedEdge(ResumeType::RESUME_CHAIN);
}
VLOG(1) << " abort()=" << apache::thrift::util::enumNameSafe(rc);
rcCommit_ = rc;
baton.post();
});
baton.wait();
Expand Down Expand Up @@ -247,14 +261,21 @@ nebula::cpp2::ErrorCode ChainUpdateEdgeLocalProcessor::getErrorCode(
return parts.front().get_code();
}

void ChainUpdateEdgeLocalProcessor::addUnfinishedEdge(ResumeType type) {
VLOG(1) << "addUnfinishedEdge()";
void ChainUpdateEdgeLocalProcessor::reportFailed(ResumeType type) {
VLOG(1) << "reportFailed()";
if (lk_ != nullptr) {
lk_->setAutoUnlock(false);
}
auto key = ConsistUtil::edgeKey(spaceVidLen_, req_.get_part_id(), req_.get_edge_key());
env_->txnMan_->addPrime(spaceId_, localPartId_, term_, key, type);
}

bool ChainUpdateEdgeLocalProcessor::isKVStoreError(nebula::cpp2::ErrorCode code) {
auto iCode = static_cast<int>(code);
auto kvStoreErrorCodeBegin = static_cast<int>(nebula::cpp2::ErrorCode::E_RAFT_UNKNOWN_PART);
auto kvStoreErrorCodeEnd = static_cast<int>(nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED);
return iCode >= kvStoreErrorCodeBegin && iCode <= kvStoreErrorCodeEnd;
}

} // namespace storage
} // namespace nebula
4 changes: 3 additions & 1 deletion src/storage/transaction/ChainUpdateEdgeLocalProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@ class ChainUpdateEdgeLocalProcessor

Code commit();

void addUnfinishedEdge(ResumeType type);
void reportFailed(ResumeType type);

int64_t getVersion(const cpp2::UpdateEdgeRequest& req);

nebula::cpp2::ErrorCode getErrorCode(const cpp2::UpdateResponse& resp);

Code checkAndBuildContexts(const cpp2::UpdateEdgeRequest& req) override;

bool isKVStoreError(nebula::cpp2::ErrorCode code);

protected:
cpp2::UpdateEdgeRequest req_;
TransactionManager::SPtrLock lkCore_;
Expand Down

0 comments on commit 5d81305

Please sign in to comment.