diff --git a/src/storage/transaction/ChainAddEdgesLocalProcessor.cpp b/src/storage/transaction/ChainAddEdgesLocalProcessor.cpp index b91901d825b..7b5f6b3622e 100644 --- a/src/storage/transaction/ChainAddEdgesLocalProcessor.cpp +++ b/src/storage/transaction/ChainAddEdgesLocalProcessor.cpp @@ -204,10 +204,10 @@ void ChainAddEdgesLocalProcessor::doRpc(folly::Promise&& promise, return; } auto* iClient = env_->interClient_; - auto* evb = env_->txnMan_->getEventBase(); + // auto* evb = env_->txnMan_->getEventBase(); folly::Promise p; auto f = p.getFuture(); - iClient->chainAddEdges(req, term_, edgeVer_, std::move(p), evb); + iClient->chainAddEdges(req, term_, edgeVer_, std::move(p)); std::move(f).thenTry([=, p = std::move(promise)](auto&& t) mutable { auto code = t.hasValue() ? t.value() : Code::E_RPC_FAILURE; diff --git a/src/storage/transaction/ChainAddEdgesLocalProcessor.h b/src/storage/transaction/ChainAddEdgesLocalProcessor.h index 40ab3b5915f..1cfc4b3272d 100644 --- a/src/storage/transaction/ChainAddEdgesLocalProcessor.h +++ b/src/storage/transaction/ChainAddEdgesLocalProcessor.h @@ -119,6 +119,17 @@ class ChainAddEdgesLocalProcessor : public BaseProcessor, * */ void replaceNullWithDefaultValue(cpp2::AddEdgesRequest& req); + /** + * @brief check is an error code belongs to kv store + * we can do retry / recover if we meet a kv store error + * but if we meet a logical error (retry will alwasy failed) + * we should return error directly. + * @param code + * @return true + * @return false + */ + bool isKVStoreError(Code code); + std::string makeReadableEdge(const cpp2::AddEdgesRequest& req); int64_t toInt(const ::nebula::Value& val); diff --git a/src/storage/transaction/ChainDeleteEdgesLocalProcessor.cpp b/src/storage/transaction/ChainDeleteEdgesLocalProcessor.cpp index 809d58e6355..b48ef27c4bc 100644 --- a/src/storage/transaction/ChainDeleteEdgesLocalProcessor.cpp +++ b/src/storage/transaction/ChainDeleteEdgesLocalProcessor.cpp @@ -215,10 +215,10 @@ void ChainDeleteEdgesLocalProcessor::doRpc(folly::Promise&& promise, return; } auto* iClient = env_->interClient_; - auto* evb = env_->txnMan_->getEventBase(); + // auto* evb = env_->txnMan_->getEventBase(); folly::Promise p; auto f = p.getFuture(); - iClient->chainDeleteEdges(req, txnId_, term_, std::move(p), evb); + iClient->chainDeleteEdges(req, txnId_, term_, std::move(p)); std::move(f).thenTry([=, p = std::move(promise)](auto&& t) mutable { auto code = t.hasValue() ? t.value() : Code::E_RPC_FAILURE; diff --git a/src/storage/transaction/ChainResumeUpdateDoublePrimeProcessor.cpp b/src/storage/transaction/ChainResumeUpdateDoublePrimeProcessor.cpp index 65632b9a81b..046cd21cc0c 100644 --- a/src/storage/transaction/ChainResumeUpdateDoublePrimeProcessor.cpp +++ b/src/storage/transaction/ChainResumeUpdateDoublePrimeProcessor.cpp @@ -30,6 +30,7 @@ folly::SemiFuture ChainResumeUpdateDoublePrimeProcessor::processRemote(Cod folly::SemiFuture ChainResumeUpdateDoublePrimeProcessor::processLocal(Code code) { VLOG(1) << " processRemote(), code = " << apache::thrift::util::enumNameSafe(code); + rcRemote_ = code; setErrorCode(code); auto currTerm = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); @@ -53,7 +54,25 @@ folly::SemiFuture ChainResumeUpdateDoublePrimeProcessor::processLocal(Code } void ChainResumeUpdateDoublePrimeProcessor::finish() { - VLOG(1) << " commitLocal()=" << apache::thrift::util::enumNameSafe(code_); + VLOG(1) << " commitLocal()=" << apache::thrift::util::enumNameSafe(rcCommit_); + TermID currTerm = 0; + std::tie(currTerm, std::ignore) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); + if (term_ == currTerm) { + if (rcCommit_ == Code::SUCCEEDED && rcRemote_ == Code::SUCCEEDED) { + lk_->setAutoUnlock(true); + } else if (isKVStoreError(rcCommit_) || rcRemote_ == Code::E_RPC_FAILURE) { + reportFailed(ResumeType::RESUME_REMOTE); + } else { + // 1. rcCommit_ has some logical error (retry can't get any help) + // 2. rcRemote_ has some error other than RPC_FAILURE + // 2.1 we should do abort if this. + // 2.2 abort() should only have kv store error + lk_->setAutoUnlock(true); + } + } else { + // if term changed, transaction manager will do the clean. + } + pushResultCode(code_, req_.get_part_id()); finished_.setValue(code_); onFinished(); } diff --git a/src/storage/transaction/ChainResumeUpdatePrimeProcessor.cpp b/src/storage/transaction/ChainResumeUpdatePrimeProcessor.cpp index 7f3f582d605..f68222ee01e 100644 --- a/src/storage/transaction/ChainResumeUpdatePrimeProcessor.cpp +++ b/src/storage/transaction/ChainResumeUpdatePrimeProcessor.cpp @@ -56,11 +56,5 @@ folly::SemiFuture ChainResumeUpdatePrimeProcessor::processLocal(Code code) return code; } -void ChainResumeUpdatePrimeProcessor::finish() { - VLOG(1) << "commitLocal()=" << apache::thrift::util::enumNameSafe(code_); - finished_.setValue(code_); - onFinished(); -} - } // namespace storage } // namespace nebula diff --git a/src/storage/transaction/ChainResumeUpdatePrimeProcessor.h b/src/storage/transaction/ChainResumeUpdatePrimeProcessor.h index a1ecc68d2bb..bf13906f5bc 100644 --- a/src/storage/transaction/ChainResumeUpdatePrimeProcessor.h +++ b/src/storage/transaction/ChainResumeUpdatePrimeProcessor.h @@ -27,8 +27,6 @@ class ChainResumeUpdatePrimeProcessor : public ChainUpdateEdgeLocalProcessor { folly::SemiFuture processLocal(nebula::cpp2::ErrorCode code) override; - void finish() override; - virtual ~ChainResumeUpdatePrimeProcessor() = default; protected: diff --git a/src/storage/transaction/ChainUpdateEdgeLocalProcessor.cpp b/src/storage/transaction/ChainUpdateEdgeLocalProcessor.cpp index 80c35c9fc07..8615cb65313 100644 --- a/src/storage/transaction/ChainUpdateEdgeLocalProcessor.cpp +++ b/src/storage/transaction/ChainUpdateEdgeLocalProcessor.cpp @@ -81,11 +81,9 @@ folly::SemiFuture ChainUpdateEdgeLocalProcessor::processRemote(Code code) return std::move(fut); } -folly::SemiFuture ChainUpdateEdgeLocalProcessor::processLocal(Code rcRemote) { - VLOG(1) << " processRemote(): " << apache::thrift::util::enumNameSafe(rcRemote); - if (rcRemote != Code::SUCCEEDED && code_ == Code::SUCCEEDED) { - code_ = rcRemote; - } +folly::SemiFuture ChainUpdateEdgeLocalProcessor::processLocal(Code code) { + VLOG(1) << " processRemote(): " << apache::thrift::util::enumNameSafe(code); + rcRemote_ = code; auto currTerm = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); if (currTerm.first != term_) { @@ -94,9 +92,9 @@ folly::SemiFuture ChainUpdateEdgeLocalProcessor::processLocal(Code rcRemot return code_; } - if (rcRemote == Code::SUCCEEDED) { + if (code == Code::SUCCEEDED) { erasePrime(); - } else if (rcRemote == Code::E_RPC_FAILURE) { + } else if (code == Code::E_RPC_FAILURE) { erasePrime(); appendDoublePrime(); } else { @@ -106,19 +104,45 @@ folly::SemiFuture ChainUpdateEdgeLocalProcessor::processLocal(Code rcRemot } } - code_ = commit(); + rcCommit_ = commit(); if (code_ != Code::SUCCEEDED) { - addUnfinishedEdge(ResumeType::RESUME_CHAIN); + reportFailed(ResumeType::RESUME_CHAIN); } else { - if (rcRemote == Code::E_RPC_FAILURE) { - addUnfinishedEdge(ResumeType::RESUME_REMOTE); + if (code == Code::E_RPC_FAILURE) { + reportFailed(ResumeType::RESUME_REMOTE); } } return code_; } +void ChainUpdateEdgeLocalProcessor::finish() { + VLOG(1) << " commitLocal()=" << apache::thrift::util::enumNameSafe(rcCommit_); + TermID currTerm = 0; + std::tie(currTerm, std::ignore) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); + if (term_ == currTerm) { + if (rcCommit_ == Code::SUCCEEDED && rcRemote_ == Code::SUCCEEDED) { + lk_->setAutoUnlock(true); + } else if (isKVStoreError(rcCommit_)) { + reportFailed(ResumeType::RESUME_CHAIN); + } else if (rcRemote_ == Code::E_RPC_FAILURE) { + reportFailed(ResumeType::RESUME_REMOTE); + } else { + // 1. rcCommit_ has some logical error (retry can't get any help) + // 2. rcRemote_ has some error other than RPC_FAILURE + // 2.1 we should do abort if this. + // 2.2 abort() should only have kv store error + lk_->setAutoUnlock(true); + } + } else { + // if term changed, transaction manager will do the clean. + } + pushResultCode(code_, req_.get_part_id()); + finished_.setValue(code_); + onFinished(); +} + void ChainUpdateEdgeLocalProcessor::doRpc(folly::Promise&& promise, int retry) noexcept { try { if (retry > retryLimit_) { @@ -129,9 +153,7 @@ void ChainUpdateEdgeLocalProcessor::doRpc(folly::Promise&& promise, int re folly::Promise p; auto reversedReq = reverseRequest(req_); - // auto* evb = env_->txnMan_->getEventBase(); auto f = p.getFuture(); - // iClient->chainUpdateEdge(reversedReq, term_, ver_, std::move(p), evb); iClient->chainUpdateEdge(reversedReq, term_, ver_, std::move(p)); std::move(f) .thenTry([=, p = std::move(promise)](auto&& t) mutable { @@ -189,12 +211,6 @@ std::string ChainUpdateEdgeLocalProcessor::sEdgeKey(const cpp2::UpdateEdgeReques return ConsistUtil::edgeKey(spaceVidLen_, req.get_part_id(), req.get_edge_key()); } -void ChainUpdateEdgeLocalProcessor::finish() { - VLOG(1) << " commitLocal()=" << apache::thrift::util::enumNameSafe(code_); - pushResultCode(code_, req_.get_part_id()); - onFinished(); -} - void ChainUpdateEdgeLocalProcessor::abort() { auto key = ConsistUtil::primeKey(spaceVidLen_, localPartId_, req_.get_edge_key()); kvErased_.emplace_back(std::move(key)); @@ -202,10 +218,8 @@ void ChainUpdateEdgeLocalProcessor::abort() { folly::Baton baton; env_->kvstore_->asyncMultiRemove( req_.get_space_id(), req_.get_part_id(), std::move(kvErased_), [&](auto rc) mutable { - LOG(INFO) << " abort()=" << apache::thrift::util::enumNameSafe(rc); - if (rc != Code::SUCCEEDED) { - addUnfinishedEdge(ResumeType::RESUME_CHAIN); - } + VLOG(1) << " abort()=" << apache::thrift::util::enumNameSafe(rc); + rcCommit_ = rc; baton.post(); }); baton.wait(); @@ -247,8 +261,8 @@ nebula::cpp2::ErrorCode ChainUpdateEdgeLocalProcessor::getErrorCode( return parts.front().get_code(); } -void ChainUpdateEdgeLocalProcessor::addUnfinishedEdge(ResumeType type) { - VLOG(1) << "addUnfinishedEdge()"; +void ChainUpdateEdgeLocalProcessor::reportFailed(ResumeType type) { + VLOG(1) << "reportFailed()"; if (lk_ != nullptr) { lk_->setAutoUnlock(false); } @@ -256,5 +270,12 @@ void ChainUpdateEdgeLocalProcessor::addUnfinishedEdge(ResumeType type) { env_->txnMan_->addPrime(spaceId_, localPartId_, term_, key, type); } +bool ChainUpdateEdgeLocalProcessor::isKVStoreError(nebula::cpp2::ErrorCode code) { + auto iCode = static_cast(code); + auto kvStoreErrorCodeBegin = static_cast(nebula::cpp2::ErrorCode::E_RAFT_UNKNOWN_PART); + auto kvStoreErrorCodeEnd = static_cast(nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); + return iCode >= kvStoreErrorCodeBegin && iCode <= kvStoreErrorCodeEnd; +} + } // namespace storage } // namespace nebula diff --git a/src/storage/transaction/ChainUpdateEdgeLocalProcessor.h b/src/storage/transaction/ChainUpdateEdgeLocalProcessor.h index 87d88a7bdcb..0cc65d0b252 100644 --- a/src/storage/transaction/ChainUpdateEdgeLocalProcessor.h +++ b/src/storage/transaction/ChainUpdateEdgeLocalProcessor.h @@ -65,7 +65,7 @@ class ChainUpdateEdgeLocalProcessor Code commit(); - void addUnfinishedEdge(ResumeType type); + void reportFailed(ResumeType type); int64_t getVersion(const cpp2::UpdateEdgeRequest& req); @@ -73,6 +73,8 @@ class ChainUpdateEdgeLocalProcessor Code checkAndBuildContexts(const cpp2::UpdateEdgeRequest& req) override; + bool isKVStoreError(nebula::cpp2::ErrorCode code); + protected: cpp2::UpdateEdgeRequest req_; TransactionManager::SPtrLock lkCore_;