From 593a823ebad4eeecd8524392cd4d74b10619a704 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Wed, 15 Sep 2021 12:24:27 +0800 Subject: [PATCH 1/4] cherry --- src/kvstore/raftex/RaftPart.cpp | 110 ++++++++++++++-------- src/kvstore/wal/FileBasedWal.cpp | 9 ++ src/kvstore/wal/FileBasedWal.h | 3 + src/kvstore/wal/Wal.h | 3 + src/kvstore/wal/test/FileBasedWalTest.cpp | 34 +++++++ 5 files changed, 121 insertions(+), 38 deletions(-) diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index d610a8f1579..d847b58417e 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1462,51 +1462,86 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, } // req.get_last_log_id_sent() >= committedLogId_ - if (lastLogTerm_ > 0 && req.get_last_log_term_sent() != lastLogTerm_) { - LOG(INFO) << idStr_ << "The local last log term is " << lastLogTerm_ - << ", which is different from the leader's prevLogTerm " - << req.get_last_log_term_sent() << ", the prevLogId is " << req.get_last_log_id_sent() - << ". So need to rollback to last committedLogId_ " << committedLogId_; - if (wal_->rollbackToLog(committedLogId_)) { - lastLogId_ = wal_->lastLogId(); - lastLogTerm_ = wal_->lastLogTerm(); - resp.set_last_log_id(lastLogId_); - resp.set_last_log_term(lastLogTerm_); - LOG(INFO) << idStr_ << "Rollback succeeded! lastLogId is " << lastLogId_ << ", logLogTerm is " - << lastLogTerm_ << ", committedLogId is " << committedLogId_ << ", term is " - << term_; - } - resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); - return; + if (req.get_last_log_id_sent() == lastLogId_ && req.get_last_log_term_sent() == lastLogTerm_) { + // nothing to do + // just append log later } else if (req.get_last_log_id_sent() > lastLogId_) { // There is a gap LOG(INFO) << idStr_ << "Local is missing logs from id " << lastLogId_ << ". Need to catch up"; resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); return; - } else if (req.get_last_log_id_sent() < lastLogId_) { - // TODO(doodle): This is a potential bug which would cause data not in - // consensus. In most case, we would hit this path when leader append logs - // to follower and timeout (leader would set lastLogIdSent_ = logIdToSend_ - - // 1 in Host). **But follower actually received it successfully**. Which - // will explain when leader retry to append these logs, the LOG belows is - // printed, and lastLogId_ == req.get_last_log_id_sent() + 1 in the LOG. - // - // In fact we should always rollback to req.get_last_log_id_sent(), and - // append the logs from leader (we can't make promise that the logs in range - // [req.get_last_log_id_sent() + 1, lastLogId_] is same with follower). - // However, this makes no difference in the above case. - LOG(INFO) << idStr_ << "Stale log! Local lastLogId " << lastLogId_ << ", lastLogTerm " - << lastLogTerm_ << ", lastLogIdSent " << req.get_last_log_id_sent() - << ", lastLogTermSent " << req.get_last_log_term_sent(); - resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE); - return; + } else { + // check the term + int reqLastLogTerm = wal_->getLogTerm(req.get_last_log_id_sent()); + if (req.get_last_log_term_sent() != reqLastLogTerm) { + LOG(INFO) << idStr_ << "The local log term is " << reqLastLogTerm + << ", which is different from the leader's prevLogTerm " + << req.get_last_log_term_sent() << ", the prevLogId is " + << req.get_last_log_id_sent() << ". So ask leader to send logs from committedLogId " + << committedLogId_; + TermID committedLogTerm = wal_->getLogTerm(committedLogId_); + if (committedLogTerm > 0) { + resp.set_last_log_id(committedLogId_); + resp.set_last_log_term(committedLogTerm); + } + resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); + return; + } } - // Append new logs + // request get_last_log_term_sent == wal[get_last_log_id_sent].log_term size_t numLogs = req.get_log_str_list().size(); LogID firstId = req.get_last_log_id_sent() + 1; - VLOG(2) << idStr_ << "Writing log [" << firstId << ", " << firstId + numLogs - 1 << "] to WAL"; - LogStrListIterator iter(firstId, req.get_log_term(), req.get_log_str_list()); + + size_t diffIndex = 0; + // find the first id/term not match, rollback until it, and append the remaining wal + if (!(req.get_last_log_id_sent() == lastLogId_ && req.get_last_log_term_sent() == lastLogTerm_)) { + // check the diff index in log, find the first log which term is not same as term in request + { + std::unique_ptr it = wal_->iterator(firstId, firstId + numLogs); + for (size_t i = 0; i < numLogs && it->valid(); i++, ++(*it), diffIndex++) { + int logTerm = it->logTerm(); + if (req.get_log_term() != logTerm) { + break; + } + } + } + + // stale log + if (diffIndex == numLogs) { + // All logs have been received before + resp.set_last_log_id(firstId + numLogs - 1); + resp.set_last_log_term(req.get_log_term()); + resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE); + // nothing to append + return; + } + + // rollback the wal_ + if (wal_->rollbackToLog(firstId + diffIndex - 1)) { + lastLogId_ = wal_->lastLogId(); + lastLogTerm_ = wal_->lastLogTerm(); + LOG(INFO) << idStr_ << "Rollback succeeded! lastLogId is " << lastLogId_ << ", logLogTerm is " + << lastLogTerm_ << ", committedLogId is " << committedLogId_ << ", term is " + << term_; + } else { + LOG(ERROR) << idStr_ << "Rollback fail! lastLogId is" << lastLogId_ << ", logLogTerm is " + << lastLogTerm_ << ", committedLogId is " << committedLogId_ << ", rollback id is " + << firstId + diffIndex - 1; + resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); + return; + } + + // update msg + firstId = firstId + diffIndex; + numLogs = numLogs - diffIndex; + } + + // Append new logs + std::vector logEntries = std::vector( + std::make_move_iterator(req.get_log_str_list().begin() + diffIndex), + std::make_move_iterator(req.get_log_str_list().end())); + LogStrListIterator iter(firstId, req.get_log_term(), std::move(logEntries)); if (wal_->appendLogs(iter)) { if (numLogs != 0) { CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId()) << "First Id is " << firstId; @@ -1516,8 +1551,7 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, resp.set_last_log_id(lastLogId_); resp.set_last_log_term(lastLogTerm_); } else { - LOG_EVERY_N(WARNING, 100) << idStr_ << "Failed to append logs to WAL"; - resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); + resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); return; } diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index da7c9658a92..9ab1d197ae6 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -715,5 +715,14 @@ size_t FileBasedWal::accessAllWalInfo(std::function f return count; } +TermID FileBasedWal::getLogTerm(LogID id) { + TermID term = -1; + auto iter = iterator(id, id); + if (iter->valid()) { + term = iter->logTerm(); + } + return term; +} + } // namespace wal } // namespace nebula diff --git a/src/kvstore/wal/FileBasedWal.h b/src/kvstore/wal/FileBasedWal.h index 779958c1e69..efb650c0bf4 100644 --- a/src/kvstore/wal/FileBasedWal.h +++ b/src/kvstore/wal/FileBasedWal.h @@ -73,6 +73,9 @@ class FileBasedWal final : public Wal, public std::enable_shared_from_thislastLogId()); } +TEST(FileBasedWal, getLogTermTest) { + TempDir walDir("/tmp/testWal.XXXXXX"); + FileBasedWalInfo info; + FileBasedWalPolicy policy; + policy.fileSize = 1024L * 1024L; + policy.bufferSize = 1024L * 1024L; + + auto wal = FileBasedWal::getWal( + walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); + + // Append > 10MB logs in total + for (int i = 1; i <= 10000; i++) { + ASSERT_TRUE( + wal->appendLog(i /*id*/, i /*term*/, 0 /*cluster*/, folly::stringPrintf(kLongMsg, i))); + } + + // in the memory buffer + ASSERT_EQ(10000, wal->getLogTerm(10000)); + // in the file + ASSERT_EQ(4, wal->getLogTerm(4)); + + // Close the wal + wal.reset(); + + // Now let's open it to read + wal = FileBasedWal::getWal( + walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); + EXPECT_EQ(10, wal->getLogTerm(10)); +} + } // namespace wal } // namespace nebula From ca9f60860fadcfaacb11b7a0fd2c051751146df9 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Sat, 18 Sep 2021 16:21:01 +0800 Subject: [PATCH 2/4] minor log changes, fix a part is reset more than expected during waiting for snapshot --- conf/nebula-storaged.conf.production | 2 +- src/kvstore/NebulaSnapshotManager.cpp | 4 ++-- src/kvstore/raftex/Host.cpp | 32 ++++++++++++++++++++------ src/kvstore/raftex/Host.h | 2 ++ src/kvstore/raftex/RaftPart.cpp | 6 ++--- src/kvstore/raftex/SnapshotManager.cpp | 8 +++++-- src/kvstore/wal/FileBasedWal.cpp | 3 +-- src/kvstore/wal/WalFileIterator.cpp | 4 ++-- 8 files changed, 42 insertions(+), 19 deletions(-) diff --git a/conf/nebula-storaged.conf.production b/conf/nebula-storaged.conf.production index e0911a27a3a..0ba755fc189 100644 --- a/conf/nebula-storaged.conf.production +++ b/conf/nebula-storaged.conf.production @@ -100,7 +100,7 @@ --enable_rocksdb_prefix_filtering=false ############### misc #################### ---snapshot_part_rate_limit=8388608 +--snapshot_part_rate_limit=10485760 --snapshot_batch_size=1048576 --rebuild_index_part_rate_limit=4194304 --rebuild_index_batch_size=1048576 diff --git a/src/kvstore/NebulaSnapshotManager.cpp b/src/kvstore/NebulaSnapshotManager.cpp index 4cbc6dadc32..5d31055be5e 100644 --- a/src/kvstore/NebulaSnapshotManager.cpp +++ b/src/kvstore/NebulaSnapshotManager.cpp @@ -11,7 +11,7 @@ #include "kvstore/RateLimiter.h" DEFINE_uint32(snapshot_part_rate_limit, - 1024 * 1024 * 2, + 1024 * 1024 * 10, "max bytes of pulling snapshot for each partition in one second"); DEFINE_uint32(snapshot_batch_size, 1024 * 512, "batch size for snapshot, in bytes"); @@ -22,7 +22,7 @@ const int32_t kReserveNum = 1024 * 4; NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) { // Snapshot rate is limited to FLAGS_snapshot_worker_threads * FLAGS_snapshot_part_rate_limit. - // So by default, the total send rate is limited to 4 * 2Mb = 8Mb. + // So by default, the total send rate is limited to 4 * 10Mb = 40Mb. LOG(INFO) << "Send snapshot is rate limited to " << FLAGS_snapshot_part_rate_limit << " for each part"; } diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 5f42bb27bbe..33c4b0c4bbd 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -34,7 +34,8 @@ Host::Host(const HostAddr& addr, std::shared_ptr part, bool isLearner) isLearner_(isLearner), idStr_(folly::stringPrintf( "%s[Host: %s:%d] ", part_->idStr_.c_str(), addr_.host.c_str(), addr_.port)), - cachingPromise_(folly::SharedPromise()) {} + cachingPromise_(folly::SharedPromise()), + rpcTimeout_(FLAGS_raft_rpc_timeout_ms) {} void Host::waitForStop() { std::unique_lock g(lock_); @@ -156,19 +157,34 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) { } void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr req) { - sendAppendLogRequest(eb, std::move(req)) - .via(eb) - .then([eb, self = shared_from_this()](folly::Try&& t) { + using TrasnportException = apache::thrift::transport::TTransportException; + sendAppendLogRequest(eb, req).via(eb).then( + [eb, self = shared_from_this(), req](folly::Try&& t) { VLOG(3) << self->idStr_ << "appendLogs() call got response"; if (t.hasException()) { + auto tranxEx = dynamic_cast(t.exception().get_exception()); VLOG(2) << self->idStr_ << t.exception().what(); cpp2::AppendLogResponse r; r.set_error_code(cpp2::ErrorCode::E_EXCEPTION); { std::lock_guard g(self->lock_); + if (tranxEx != nullptr && tranxEx->getType() == TrasnportException::TIMED_OUT) { + VLOG(2) << self->idStr_ << "append log time out" + << ", space " << req->get_space() << ", part " << req->get_part() + << ", current term " << req->get_current_term() << ", last_log_id " + << req->get_last_log_id() << ", committed_id " << req->get_committed_log_id() + << ", last_log_term_sent" << req->get_last_log_term_sent() + << ", last_log_id_sent " << req->get_last_log_id_sent() + << ", set lastLogIdSent_ to logIdToSend_ " << self->logIdToSend_ + << ", logs size " << req->get_log_str_list().size(); + if ((self->rpcTimeout_ << 1) < FLAGS_raft_heartbeat_interval_secs * 1000) { + self->rpcTimeout_ <<= 1; + } + } self->setResponse(r); self->lastLogIdSent_ = self->logIdToSend_ - 1; } + // a new raft log or heartbeat will trigger another appendLogs in Host self->noMoreRequestCV_.notify_all(); return; } @@ -181,6 +197,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptrlastLogIdSent_ << ", lastLogTermSent_ " << self->lastLogTermSent_; + self->rpcTimeout_ = FLAGS_raft_rpc_timeout_ms; switch (resp.get_error_code()) { case cpp2::ErrorCode::SUCCEEDED: { VLOG(2) << self->idStr_ << "AppendLog request sent successfully"; @@ -444,11 +461,12 @@ folly::Future Host::sendAppendLogRequest( << ", part " << req->get_part() << ", current term " << req->get_current_term() << ", last_log_id " << req->get_last_log_id() << ", committed_id " - << req->get_committed_log_id() << ", last_log_term_sent" + << req->get_committed_log_id() << ", last_log_term_sent " << req->get_last_log_term_sent() << ", last_log_id_sent " - << req->get_last_log_id_sent(); + << req->get_last_log_id_sent() << ", logs in request " + << req->get_log_str_list().size(); // Get client connection - auto client = part_->clientMan_->client(addr_, eb, false, FLAGS_raft_rpc_timeout_ms); + auto client = part_->clientMan_->client(addr_, eb, false, rpcTimeout_); return client->future_appendLog(*req); } diff --git a/src/kvstore/raftex/Host.h b/src/kvstore/raftex/Host.h index bda06d539d4..64c9fdeadcc 100644 --- a/src/kvstore/raftex/Host.h +++ b/src/kvstore/raftex/Host.h @@ -140,6 +140,8 @@ class Host final : public std::enable_shared_from_this { // CommittedLogId of follower LogID followerCommittedLogId_{0}; + + uint32_t rpcTimeout_; }; } // namespace raftex diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index d847b58417e..e73d2a838ff 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1193,7 +1193,6 @@ void RaftPart::statusPolling(int64_t startTime) { sendHeartbeat(); } if (needToCleanupSnapshot()) { - LOG(INFO) << idStr_ << "Clean up the snapshot"; cleanupSnapshot(); } { @@ -1399,6 +1398,7 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, << " " << req.get_committed_log_id(); reset(); status_ = Status::WAITING_SNAPSHOT; + lastSnapshotRecvDur_.reset(); resp.set_error_code(cpp2::ErrorCode::E_WAITING_SNAPSHOT); return; } @@ -1522,8 +1522,8 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, lastLogId_ = wal_->lastLogId(); lastLogTerm_ = wal_->lastLogTerm(); LOG(INFO) << idStr_ << "Rollback succeeded! lastLogId is " << lastLogId_ << ", logLogTerm is " - << lastLogTerm_ << ", committedLogId is " << committedLogId_ << ", term is " - << term_; + << lastLogTerm_ << ", committedLogId is " << committedLogId_ << ", logs in request " + << numLogs << ", remaining logs after rollback " << numLogs - diffIndex; } else { LOG(ERROR) << idStr_ << "Rollback fail! lastLogId is" << lastLogId_ << ", logLogTerm is " << lastLogTerm_ << ", committedLogId is " << committedLogId_ << ", rollback id is " diff --git a/src/kvstore/raftex/SnapshotManager.cpp b/src/kvstore/raftex/SnapshotManager.cpp index ea20c0a18bb..90a78931351 100644 --- a/src/kvstore/raftex/SnapshotManager.cpp +++ b/src/kvstore/raftex/SnapshotManager.cpp @@ -19,8 +19,12 @@ namespace nebula { namespace raftex { SnapshotManager::SnapshotManager() { - executor_.reset(new folly::IOThreadPoolExecutor(FLAGS_snapshot_worker_threads)); - ioThreadPool_.reset(new folly::IOThreadPoolExecutor(FLAGS_snapshot_io_threads)); + executor_.reset(new folly::IOThreadPoolExecutor( + FLAGS_snapshot_worker_threads, + std::make_shared("snapshot-worker"))); + ioThreadPool_.reset(new folly::IOThreadPoolExecutor( + FLAGS_snapshot_io_threads, + std::make_shared("snapshot-ioexecutor"))); } folly::Future SnapshotManager::sendSnapshot(std::shared_ptr part, diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index 9ab1d197ae6..fa081360581 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -354,7 +354,6 @@ void FileBasedWal::rollbackInFile(WalFileInfoPtr info, LogID logId) { } lastLogId_ = logId; lastLogTerm_ = term; - LOG(INFO) << idStr_ << "Rollback to log " << logId; CHECK_GT(pos, 0) << "This wal should have been deleted"; if (pos < FileUtils::fileSize(path)) { @@ -632,7 +631,7 @@ bool FileBasedWal::reset() { std::vector files = FileUtils::listAllFilesInDir(dir_.c_str(), false, "*.wal"); for (auto& fn : files) { auto absFn = FileUtils::joinPath(dir_, fn); - LOG(INFO) << "Removing " << absFn; + VLOG(1) << "Removing " << absFn; unlink(absFn.c_str()); } lastLogId_ = firstLogId_ = 0; diff --git a/src/kvstore/wal/WalFileIterator.cpp b/src/kvstore/wal/WalFileIterator.cpp index 2cd4f18a952..777735b56be 100644 --- a/src/kvstore/wal/WalFileIterator.cpp +++ b/src/kvstore/wal/WalFileIterator.cpp @@ -28,8 +28,8 @@ WalFileIterator::WalFileIterator(std::shared_ptr wal, LogID startI } if (startId < wal_->firstLogId()) { - LOG(ERROR) << wal_->idStr_ << "The given log id " << startId - << " is out of the range, the wal firstLogId is " << wal_->firstLogId(); + VLOG(1) << wal_->idStr_ << "The given log id " << startId + << " is out of the range, the wal firstLogId is " << wal_->firstLogId(); currId_ = lastId_ + 1; return; } From 5fb2575ae3fa439c981b07bbd0f7a0b75b648b95 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Sat, 18 Sep 2021 17:10:20 +0800 Subject: [PATCH 3/4] remove useless rollback --- src/kvstore/raftex/RaftPart.cpp | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index e73d2a838ff..2dc8f2970e6 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1326,7 +1326,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, // Before change role from leader to follower, check the logs locally. if (role_ == Role::LEADER && wal_->lastLogId() > lastLogId_) { - LOG(INFO) << idStr_ << "There is one log " << wal_->lastLogId() + LOG(INFO) << idStr_ << "There are some logs up to " << wal_->lastLogId() << " i did not commit when i was leader, rollback to " << lastLogId_; wal_->rollbackToLog(lastLogId_); } @@ -1649,12 +1649,6 @@ cpp2::ErrorCode RaftPart::verifyLeader(const REQ& req) { votedAddr_ = HostAddr("", 0); weight_ = 1; isBlindFollower_ = false; - // Before accept the logs from the new leader, check the logs locally. - if (wal_->lastLogId() > lastLogId_) { - LOG(INFO) << idStr_ << "There is one log " << wal_->lastLogId() - << " i did not commit when i was leader, rollback to " << lastLogId_; - wal_->rollbackToLog(lastLogId_); - } if (oldRole == Role::LEADER) { // Need to invoke onLostLeadership callback bgWorkers_->addTask([self = shared_from_this(), oldTerm] { self->onLostLeadership(oldTerm); }); From 15f514115890799024b8873ff70e84801a074f72 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Sat, 18 Sep 2021 17:11:37 +0800 Subject: [PATCH 4/4] optimize when rpc timeout --- src/clients/storage/StorageClientBase-inl.h | 92 ++++++++++++--------- src/kvstore/raftex/Host.cpp | 78 +++++++++-------- 2 files changed, 99 insertions(+), 71 deletions(-) diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index 608639bf471..749563a8879 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -115,6 +115,7 @@ folly::SemiFuture> StorageClientBase::c folly::EventBase* evb, std::unordered_map requests, RemoteFunc&& remoteFunc) { + using TransportException = apache::thrift::transport::TTransportException; auto context = std::make_shared>( requests.size(), std::move(remoteFunc)); @@ -137,49 +138,64 @@ folly::SemiFuture> StorageClientBase::c // Since all requests are sent using the same eventbase, all // then-callback will be executed on the same IO thread .via(evb) - .then([this, context, host, spaceId, start](folly::Try&& val) { - auto& r = context->findRequest(host); - if (val.hasException()) { - LOG(ERROR) << "Request to " << host << " failed: " << val.exception().what(); - auto parts = getReqPartsId(r); - context->resp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE); - invalidLeader(spaceId, parts); - context->resp.markFailure(); - } else { - auto resp = std::move(val.value()); - auto& result = resp.get_result(); - bool hasFailure{false}; - for (auto& code : result.get_failed_parts()) { - VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code " - << static_cast(code.get_code()); - hasFailure = true; - context->resp.emplaceFailedPart(code.get_part_id(), code.get_code()); - if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - auto* leader = code.get_leader(); - if (isValidHostPtr(leader)) { - updateLeader(spaceId, code.get_part_id(), *leader); - } else { - invalidLeader(spaceId, code.get_part_id()); - } - } else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND || - code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) { - invalidLeader(spaceId, code.get_part_id()); + .thenValue([this, context, host, spaceId, start](Response&& resp) { + auto& result = resp.get_result(); + bool hasFailure{false}; + for (auto& code : result.get_failed_parts()) { + VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code " + << static_cast(code.get_code()); + hasFailure = true; + context->resp.emplaceFailedPart(code.get_part_id(), code.get_code()); + if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { + auto* leader = code.get_leader(); + if (isValidHostPtr(leader)) { + updateLeader(spaceId, code.get_part_id(), *leader); } else { - // do nothing + invalidLeader(spaceId, code.get_part_id()); } + } else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND || + code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) { + invalidLeader(spaceId, code.get_part_id()); + } else { + // do nothing } - if (hasFailure) { - context->resp.markFailure(); - } - - // Adjust the latency - auto latency = result.get_latency_in_us(); - context->resp.setLatency(host, latency, time::WallClock::fastNowInMicroSec() - start); - - // Keep the response - context->resp.addResponse(std::move(resp)); + } + if (hasFailure) { + context->resp.markFailure(); } + // Adjust the latency + auto latency = result.get_latency_in_us(); + context->resp.setLatency(host, latency, time::WallClock::fastNowInMicroSec() - start); + + // Keep the response + context->resp.addResponse(std::move(resp)); + }) + .thenError(folly::tag_t{}, + [this, context, host, spaceId](TransportException&& ex) { + auto& r = context->findRequest(host); + auto parts = getReqPartsId(r); + if (ex.getType() == TransportException::TIMED_OUT) { + LOG(ERROR) << "Request to " << host << " time out: " << ex.what(); + } else { + invalidLeader(spaceId, parts); + LOG(ERROR) << "Request to " << host << " failed: " << ex.what(); + } + context->resp.appendFailedParts(parts, + nebula::cpp2::ErrorCode::E_RPC_FAILURE); + context->resp.markFailure(); + }) + .thenError(folly::tag_t{}, + [this, context, host, spaceId](std::exception&& ex) { + auto& r = context->findRequest(host); + auto parts = getReqPartsId(r); + LOG(ERROR) << "Request to " << host << " failed: " << ex.what(); + invalidLeader(spaceId, parts); + context->resp.appendFailedParts(parts, + nebula::cpp2::ErrorCode::E_RPC_FAILURE); + context->resp.markFailure(); + }) + .ensure([context, host] { if (context->removeRequest(host)) { // Received all responses context->promise.setValue(std::move(context->resp)); diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 33c4b0c4bbd..b3d6126954f 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -157,39 +157,10 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) { } void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr req) { - using TrasnportException = apache::thrift::transport::TTransportException; - sendAppendLogRequest(eb, req).via(eb).then( - [eb, self = shared_from_this(), req](folly::Try&& t) { - VLOG(3) << self->idStr_ << "appendLogs() call got response"; - if (t.hasException()) { - auto tranxEx = dynamic_cast(t.exception().get_exception()); - VLOG(2) << self->idStr_ << t.exception().what(); - cpp2::AppendLogResponse r; - r.set_error_code(cpp2::ErrorCode::E_EXCEPTION); - { - std::lock_guard g(self->lock_); - if (tranxEx != nullptr && tranxEx->getType() == TrasnportException::TIMED_OUT) { - VLOG(2) << self->idStr_ << "append log time out" - << ", space " << req->get_space() << ", part " << req->get_part() - << ", current term " << req->get_current_term() << ", last_log_id " - << req->get_last_log_id() << ", committed_id " << req->get_committed_log_id() - << ", last_log_term_sent" << req->get_last_log_term_sent() - << ", last_log_id_sent " << req->get_last_log_id_sent() - << ", set lastLogIdSent_ to logIdToSend_ " << self->logIdToSend_ - << ", logs size " << req->get_log_str_list().size(); - if ((self->rpcTimeout_ << 1) < FLAGS_raft_heartbeat_interval_secs * 1000) { - self->rpcTimeout_ <<= 1; - } - } - self->setResponse(r); - self->lastLogIdSent_ = self->logIdToSend_ - 1; - } - // a new raft log or heartbeat will trigger another appendLogs in Host - self->noMoreRequestCV_.notify_all(); - return; - } - - cpp2::AppendLogResponse resp = std::move(t).value(); + using TransportException = apache::thrift::transport::TTransportException; + sendAppendLogRequest(eb, req) + .via(eb) + .thenValue([eb, self = shared_from_this()](cpp2::AppendLogResponse&& resp) { LOG_IF(INFO, FLAGS_trace_raft) << self->idStr_ << "AppendLogResponse " << "code " << apache::thrift::util::enumNameSafe(resp.get_error_code()) << ", currTerm " @@ -375,6 +346,47 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr{}, + [self = shared_from_this(), req](TransportException&& ex) { + VLOG(2) << self->idStr_ << ex.what(); + cpp2::AppendLogResponse r; + r.set_error_code(cpp2::ErrorCode::E_EXCEPTION); + { + std::lock_guard g(self->lock_); + if (ex.getType() == TransportException::TIMED_OUT) { + VLOG(2) << self->idStr_ << "append log time out" + << ", space " << req->get_space() << ", part " << req->get_part() + << ", current term " << req->get_current_term() << ", last_log_id " + << req->get_last_log_id() << ", committed_id " + << req->get_committed_log_id() << ", last_log_term_sent" + << req->get_last_log_term_sent() << ", last_log_id_sent " + << req->get_last_log_id_sent() + << ", set lastLogIdSent_ to logIdToSend_ " << self->logIdToSend_ + << ", logs size " << req->get_log_str_list().size(); + if ((self->rpcTimeout_ << 1) < FLAGS_raft_heartbeat_interval_secs * 1000) { + self->rpcTimeout_ <<= 1; + } + } + self->setResponse(r); + self->lastLogIdSent_ = self->logIdToSend_ - 1; + } + // a new raft log or heartbeat will trigger another appendLogs in Host + self->noMoreRequestCV_.notify_all(); + return; + }) + .thenError(folly::tag_t{}, [self = shared_from_this()](std::exception&& ex) { + VLOG(2) << self->idStr_ << ex.what(); + cpp2::AppendLogResponse r; + r.set_error_code(cpp2::ErrorCode::E_EXCEPTION); + { + std::lock_guard g(self->lock_); + self->setResponse(r); + self->lastLogIdSent_ = self->logIdToSend_ - 1; + } + // a new raft log or heartbeat will trigger another appendLogs in Host + self->noMoreRequestCV_.notify_all(); + return; }); }