diff --git a/conf/nebula-storaged.conf.production b/conf/nebula-storaged.conf.production index f41a16b7d21..8789ebdd08a 100644 --- a/conf/nebula-storaged.conf.production +++ b/conf/nebula-storaged.conf.production @@ -102,7 +102,7 @@ --enable_rocksdb_whole_key_filtering=false ############### misc #################### ---snapshot_part_rate_limit=8388608 +--snapshot_part_rate_limit=10485760 --snapshot_batch_size=1048576 --rebuild_index_part_rate_limit=4194304 --rebuild_index_batch_size=1048576 diff --git a/src/common/base/SlowOpTracker.cpp b/src/common/base/SlowOpTracker.cpp index 5a81c7db52d..debdaa048c2 100644 --- a/src/common/base/SlowOpTracker.cpp +++ b/src/common/base/SlowOpTracker.cpp @@ -8,4 +8,4 @@ #include "common/base/Base.h" #include "common/time/WallClock.h" -DEFINE_int64(slow_op_threshhold_ms, 50, "default threshhold for slow operation"); +DEFINE_int64(slow_op_threshhold_ms, 100, "default threshhold for slow operation"); diff --git a/src/interface/raftex.thrift b/src/interface/raftex.thrift index 20972b82af7..d59bd568ff6 100644 --- a/src/interface/raftex.thrift +++ b/src/interface/raftex.thrift @@ -10,29 +10,27 @@ cpp_include "common/thrift/ThriftTypes.h" enum ErrorCode { SUCCEEDED = 0; - E_LOG_GAP = -1; - E_LOG_STALE = -2; - E_MISSING_COMMIT = -3; - E_WAITING_SNAPSHOT = -4; // The follower is waiting a snapshot - - E_UNKNOWN_PART = -5; - E_TERM_OUT_OF_DATE = -6; - E_LAST_LOG_TERM_TOO_OLD = -7; - E_BAD_STATE = -8; - E_WRONG_LEADER = -9; - E_WAL_FAIL = -10; - E_NOT_READY = -11; + E_UNKNOWN_PART = -1; - // Local errors - E_HOST_STOPPED = -12; - E_NOT_A_LEADER = -13; - E_HOST_DISCONNECTED = -14; - E_TOO_MANY_REQUESTS = -15; - E_PERSIST_SNAPSHOT_FAILED = -16; + // Raft consensus errors + E_LOG_GAP = -2; + E_LOG_STALE = -3; + E_TERM_OUT_OF_DATE = -4; - E_BAD_ROLE = -17, + // Raft state errors + E_WAITING_SNAPSHOT = -5; // The follower is waiting a snapshot + E_BAD_STATE = -6; + E_WRONG_LEADER = -7; + E_NOT_READY = -8; + E_BAD_ROLE = -9, - E_EXCEPTION = -20; // An thrift internal exception was thrown + // Local errors + E_WAL_FAIL = -10; + E_HOST_STOPPED = -11; + E_TOO_MANY_REQUESTS = -12; + E_PERSIST_SNAPSHOT_FAILED = -13; + E_RPC_EXCEPTION = -14; // An thrift internal exception was thrown + E_NO_WAL_FOUND = -15; } typedef i64 (cpp.type = "nebula::ClusterID") ClusterID @@ -103,8 +101,6 @@ struct AppendLogRequest { // 10: TermID log_term; 11: list log_str_list; - - 12: bool sending_snapshot; } diff --git a/src/kvstore/NebulaSnapshotManager.cpp b/src/kvstore/NebulaSnapshotManager.cpp index 6a2fd3ec8fc..f9401ca04fb 100644 --- a/src/kvstore/NebulaSnapshotManager.cpp +++ b/src/kvstore/NebulaSnapshotManager.cpp @@ -10,7 +10,7 @@ #include "kvstore/RateLimiter.h" DEFINE_uint32(snapshot_part_rate_limit, - 1024 * 1024 * 8, + 1024 * 1024 * 10, "max bytes of pulling snapshot for each partition in one second"); DEFINE_uint32(snapshot_batch_size, 1024 * 512, "batch size for snapshot, in bytes"); @@ -21,7 +21,7 @@ const int32_t kReserveNum = 1024 * 4; NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) { // Snapshot rate is limited to FLAGS_snapshot_worker_threads * FLAGS_snapshot_part_rate_limit. - // So by default, the total send rate is limited to 4 * 8Mb = 32Mb. + // So by default, the total send rate is limited to 4 * 10Mb = 40Mb. LOG(INFO) << "Send snapshot is rate limited to " << FLAGS_snapshot_part_rate_limit << " for each part by default"; } diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 20d8a400050..ab79d2b9195 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -50,11 +50,6 @@ cpp2::ErrorCode Host::checkStatus() const { return cpp2::ErrorCode::E_HOST_STOPPED; } - if (paused_) { - VLOG(2) << idStr_ << "The host is paused, due to losing leadership"; - return cpp2::ErrorCode::E_NOT_A_LEADER; - } - return cpp2::ErrorCode::SUCCEEDED; } @@ -70,8 +65,7 @@ folly::Future Host::askForVote(const cpp2::AskForVoteR return resp; } } - auto client = - part_->clientMan_->client(addr_, eb, false, FLAGS_raft_heartbeat_interval_secs * 1000); + auto client = part_->clientMan_->client(addr_, eb, false, FLAGS_raft_rpc_timeout_ms); return client->future_askForVote(req); } @@ -89,23 +83,18 @@ folly::Future Host::appendLogs(folly::EventBase* eb, std::lock_guard g(lock_); auto res = checkStatus(); - if (logId <= lastLogIdSent_) { - LOG(INFO) << idStr_ << "The log " << logId << " has been sended" - << ", lastLogIdSent " << lastLogIdSent_; - cpp2::AppendLogResponse r; - r.set_error_code(cpp2::ErrorCode::SUCCEEDED); - return r; - } - if (requestOnGoing_ && res == cpp2::ErrorCode::SUCCEEDED) { + if (UNLIKELY(sendingSnapshot_)) { + LOG_EVERY_N(INFO, 500) << idStr_ << "The target host is waiting for a snapshot"; + res = cpp2::ErrorCode::E_WAITING_SNAPSHOT; + } else if (requestOnGoing_) { + // buffer incoming request to pendingReq_ if (cachingPromise_.size() <= FLAGS_max_outstanding_requests) { pendingReq_ = std::make_tuple(term, logId, committedLogId); return cachingPromise_.getFuture(); } else { LOG_EVERY_N(INFO, 200) << idStr_ << "Too many requests are waiting, return error"; - cpp2::AppendLogResponse r; - r.set_error_code(cpp2::ErrorCode::E_TOO_MANY_REQUESTS); - return r; + res = cpp2::ErrorCode::E_TOO_MANY_REQUESTS; } } @@ -129,14 +118,23 @@ folly::Future Host::appendLogs(folly::EventBase* eb, logTermToSend_ = term; logIdToSend_ = logId; committedLogId_ = committedLogId; - pendingReq_ = std::make_tuple(0, 0, 0); - promise_ = std::move(cachingPromise_); - cachingPromise_ = folly::SharedPromise(); - ret = promise_.getFuture(); - - requestOnGoing_ = true; - req = prepareAppendLogRequest(); + auto result = prepareAppendLogRequest(); + if (ok(result)) { + LOG_IF(INFO, FLAGS_trace_raft) << idStr_ << "Sending the pending request in the queue" + << ", from " << lastLogIdSent_ + 1 << " to " << logIdToSend_; + req = std::move(value(result)); + pendingReq_ = std::make_tuple(0, 0, 0); + promise_ = std::move(cachingPromise_); + cachingPromise_ = folly::SharedPromise(); + ret = promise_.getFuture(); + requestOnGoing_ = true; + } else { + // target host is waiting for a snapshot or wal not found + cpp2::AppendLogResponse r; + r.set_error_code(error(result)); + return r; + } } // Get a new promise @@ -152,6 +150,7 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) { cachingPromise_ = folly::SharedPromise(); pendingReq_ = std::make_tuple(0, 0, 0); requestOnGoing_ = false; + noMoreRequestCV_.notify_all(); } void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr req) { @@ -167,7 +166,9 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptrlastLogIdSent_ << ", lastLogTermSent_ " << self->lastLogTermSent_; switch (resp.get_error_code()) { - case cpp2::ErrorCode::SUCCEEDED: { + case cpp2::ErrorCode::SUCCEEDED: + case cpp2::ErrorCode::E_LOG_GAP: + case cpp2::ErrorCode::E_LOG_STALE: { VLOG(2) << self->idStr_ << "AppendLog request sent successfully"; std::shared_ptr newReq; @@ -175,161 +176,44 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr g(self->lock_); auto res = self->checkStatus(); if (res != cpp2::ErrorCode::SUCCEEDED) { - VLOG(2) << self->idStr_ - << "The host is not in a proper status," - " just return"; - cpp2::AppendLogResponse r; - r.set_error_code(res); - self->setResponse(r); - } else if (self->lastLogIdSent_ >= resp.get_last_log_id()) { - VLOG(2) << self->idStr_ << "We send nothing in the last request" - << ", so we don't send the same logs again"; - self->followerCommittedLogId_ = resp.get_committed_log_id(); cpp2::AppendLogResponse r; r.set_error_code(res); self->setResponse(r); - } else { - self->lastLogIdSent_ = resp.get_last_log_id(); - self->lastLogTermSent_ = resp.get_last_log_term(); - self->followerCommittedLogId_ = resp.get_committed_log_id(); - if (self->lastLogIdSent_ < self->logIdToSend_) { - // More to send - VLOG(2) << self->idStr_ << "There are more logs to send"; - newReq = self->prepareAppendLogRequest(); - } else { - VLOG(2) << self->idStr_ - << "Fulfill the promise, size = " << self->promise_.size(); - // Fulfill the promise - self->promise_.setValue(resp); - - if (self->noRequest()) { - VLOG(2) << self->idStr_ << "No request any more!"; - self->requestOnGoing_ = false; - } else { - auto& tup = self->pendingReq_; - self->logTermToSend_ = std::get<0>(tup); - self->logIdToSend_ = std::get<1>(tup); - self->committedLogId_ = std::get<2>(tup); - VLOG(2) << self->idStr_ << "Sending the pending request in the queue" - << ", from " << self->lastLogIdSent_ + 1 << " to " - << self->logIdToSend_; - newReq = self->prepareAppendLogRequest(); - self->promise_ = std::move(self->cachingPromise_); - self->cachingPromise_ = folly::SharedPromise(); - self->pendingReq_ = std::make_tuple(0, 0, 0); - } // self->noRequest() - } // self->lastLogIdSent_ < self->logIdToSend_ - } // else - } - if (newReq) { - self->appendLogsInternal(eb, newReq); - } else { - self->noMoreRequestCV_.notify_all(); - } - return; - } - case cpp2::ErrorCode::E_LOG_GAP: { - VLOG(2) << self->idStr_ << "The host's log is behind, need to catch up"; - std::shared_ptr newReq; - { - std::lock_guard g(self->lock_); - auto res = self->checkStatus(); - if (res != cpp2::ErrorCode::SUCCEEDED) { - VLOG(2) << self->idStr_ - << "The host is not in a proper status," - " skip catching up the gap"; - cpp2::AppendLogResponse r; - r.set_error_code(res); - self->setResponse(r); - } else if (self->lastLogIdSent_ == resp.get_last_log_id()) { - VLOG(2) << self->idStr_ << "We send nothing in the last request" - << ", so we don't send the same logs again"; - self->lastLogIdSent_ = resp.get_last_log_id(); - self->lastLogTermSent_ = resp.get_last_log_term(); - self->followerCommittedLogId_ = resp.get_committed_log_id(); - cpp2::AppendLogResponse r; - r.set_error_code(cpp2::ErrorCode::SUCCEEDED); - self->setResponse(r); - } else { - self->lastLogIdSent_ = std::min(resp.get_last_log_id(), self->logIdToSend_ - 1); - self->lastLogTermSent_ = resp.get_last_log_term(); - self->followerCommittedLogId_ = resp.get_committed_log_id(); - newReq = self->prepareAppendLogRequest(); + return; } - } - if (newReq) { - self->appendLogsInternal(eb, newReq); - } else { - self->noMoreRequestCV_.notify_all(); - } - return; - } - case cpp2::ErrorCode::E_WAITING_SNAPSHOT: { - LOG(INFO) << self->idStr_ - << "The host is waiting for the snapshot, so we need to " - "send log from " - << "current committedLogId " << self->committedLogId_; - std::shared_ptr newReq; - { - std::lock_guard g(self->lock_); - auto res = self->checkStatus(); - if (res != cpp2::ErrorCode::SUCCEEDED) { - VLOG(2) << self->idStr_ - << "The host is not in a proper status," - " skip waiting the snapshot"; - cpp2::AppendLogResponse r; - r.set_error_code(res); - self->setResponse(r); - } else { - self->lastLogIdSent_ = self->committedLogId_; - self->lastLogTermSent_ = self->logTermToSend_; - self->followerCommittedLogId_ = resp.get_committed_log_id(); - newReq = self->prepareAppendLogRequest(); - } - } - if (newReq) { - self->appendLogsInternal(eb, newReq); - } else { - self->noMoreRequestCV_.notify_all(); - } - return; - } - case cpp2::ErrorCode::E_LOG_STALE: { - VLOG(2) << self->idStr_ << "Log stale, reset lastLogIdSent " << self->lastLogIdSent_ - << " to the followers lastLodId " << resp.get_last_log_id(); - std::shared_ptr newReq; - { - std::lock_guard g(self->lock_); - auto res = self->checkStatus(); - if (res != cpp2::ErrorCode::SUCCEEDED) { - VLOG(2) << self->idStr_ - << "The host is not in a proper status," - " skip waiting the snapshot"; - cpp2::AppendLogResponse r; - r.set_error_code(res); - self->setResponse(r); - } else if (self->logIdToSend_ <= resp.get_last_log_id()) { - VLOG(2) << self->idStr_ << "It means the request has been received by follower"; - self->lastLogIdSent_ = self->logIdToSend_ - 1; - self->lastLogTermSent_ = resp.get_last_log_term(); - self->followerCommittedLogId_ = resp.get_committed_log_id(); - cpp2::AppendLogResponse r; - r.set_error_code(cpp2::ErrorCode::SUCCEEDED); - self->setResponse(r); + // Host is working + self->lastLogIdSent_ = resp.get_last_log_id(); + self->lastLogTermSent_ = resp.get_last_log_term(); + self->followerCommittedLogId_ = resp.get_committed_log_id(); + if (self->lastLogIdSent_ < self->logIdToSend_) { + // More to send + VLOG(2) << self->idStr_ << "There are more logs to send"; + auto result = self->prepareAppendLogRequest(); + if (ok(result)) { + newReq = std::move(value(result)); + } else { + cpp2::AppendLogResponse r; + r.set_error_code(error(result)); + self->setResponse(r); + return; + } } else { - self->lastLogIdSent_ = std::min(resp.get_last_log_id(), self->logIdToSend_ - 1); - self->lastLogTermSent_ = resp.get_last_log_term(); - self->followerCommittedLogId_ = resp.get_committed_log_id(); - newReq = self->prepareAppendLogRequest(); + // resp.get_last_log_id() >= self->logIdToSend_ + // All logs up to logIdToSend_ has been sent, fulfill the promise + self->promise_.setValue(resp); + // Check if there are any pending request: + // Eithor send pending requst if any, or set Host to vacant + newReq = self->getPendingReqIfAny(self); } } if (newReq) { self->appendLogsInternal(eb, newReq); - } else { - self->noMoreRequestCV_.notify_all(); } return; } + // Usually the peer is not in proper state, for example: + // E_UNKNOWN_PART/E_BAD_STATE/E_NOT_READY/E_WAITING_SNAPSHOT + // In this case, nothing changed, just return the error default: { LOG_EVERY_N(ERROR, 100) << self->idStr_ << "Failed to append logs to the host (Err: " @@ -337,9 +221,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr g(self->lock_); self->setResponse(resp); - self->lastLogIdSent_ = self->logIdToSend_ - 1; } - self->noMoreRequestCV_.notify_all(); return; } } @@ -348,66 +230,63 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptridStr_ << ex.what(); cpp2::AppendLogResponse r; - r.set_error_code(cpp2::ErrorCode::E_EXCEPTION); + r.set_error_code(cpp2::ErrorCode::E_RPC_EXCEPTION); { std::lock_guard g(self->lock_); if (ex.getType() == TransportException::TIMED_OUT) { - VLOG(2) << self->idStr_ << "append log time out" - << ", space " << req->get_space() << ", part " << req->get_part() - << ", current term " << req->get_current_term() << ", last_log_id " - << req->get_last_log_id() << ", committed_id " - << req->get_committed_log_id() << ", last_log_term_sent" - << req->get_last_log_term_sent() << ", last_log_id_sent " - << req->get_last_log_id_sent() << ", logs size " - << req->get_log_str_list().size(); + LOG_IF(INFO, FLAGS_trace_raft) + << self->idStr_ << "append log time out" + << ", space " << req->get_space() << ", part " << req->get_part() + << ", current term " << req->get_current_term() << ", last_log_id " + << req->get_last_log_id() << ", committed_id " + << req->get_committed_log_id() << ", last_log_term_sent " + << req->get_last_log_term_sent() << ", last_log_id_sent " + << req->get_last_log_id_sent() << ", set lastLogIdSent_ to logIdToSend_ " + << self->logIdToSend_ << ", logs size " + << req->get_log_str_list().size(); } self->setResponse(r); - self->lastLogIdSent_ = self->logIdToSend_ - 1; } // a new raft log or heartbeat will trigger another appendLogs in Host - self->noMoreRequestCV_.notify_all(); return; }) .thenError(folly::tag_t{}, [self = shared_from_this()](std::exception&& ex) { VLOG(2) << self->idStr_ << ex.what(); cpp2::AppendLogResponse r; - r.set_error_code(cpp2::ErrorCode::E_EXCEPTION); + r.set_error_code(cpp2::ErrorCode::E_RPC_EXCEPTION); { std::lock_guard g(self->lock_); self->setResponse(r); - self->lastLogIdSent_ = self->logIdToSend_ - 1; } // a new raft log or heartbeat will trigger another appendLogs in Host - self->noMoreRequestCV_.notify_all(); return; }); } -std::shared_ptr Host::prepareAppendLogRequest() { +ErrorOr> Host::prepareAppendLogRequest() { CHECK(!lock_.try_lock()); - auto req = std::make_shared(); - req->set_space(part_->spaceId()); - req->set_part(part_->partitionId()); - req->set_current_term(logTermToSend_); - req->set_last_log_id(logIdToSend_); - req->set_leader_addr(part_->address().host); - req->set_leader_port(part_->address().port); - req->set_committed_log_id(committedLogId_); - req->set_last_log_term_sent(lastLogTermSent_); - req->set_last_log_id_sent(lastLogIdSent_); - VLOG(2) << idStr_ << "Prepare AppendLogs request from Log " << lastLogIdSent_ + 1 << " to " << logIdToSend_; if (lastLogIdSent_ + 1 > part_->wal()->lastLogId()) { - LOG(INFO) << idStr_ << "My lastLogId in wal is " << part_->wal()->lastLogId() - << ", but you are seeking " << lastLogIdSent_ + 1 << ", so i have nothing to send."; - return req; + LOG_IF(INFO, FLAGS_trace_raft) + << idStr_ << "My lastLogId in wal is " << part_->wal()->lastLogId() + << ", but you are seeking " << lastLogIdSent_ + 1 + << ", so i have nothing to send, logIdToSend_ = " << logIdToSend_; + return cpp2::ErrorCode::E_NO_WAL_FOUND; } auto it = part_->wal()->iterator(lastLogIdSent_ + 1, logIdToSend_); if (it->valid()) { - VLOG(2) << idStr_ << "Prepare the list of log entries to send"; - auto term = it->logTerm(); + auto req = std::make_shared(); + req->set_space(part_->spaceId()); + req->set_part(part_->partitionId()); + req->set_current_term(logTermToSend_); + req->set_last_log_id(logIdToSend_); + req->set_leader_addr(part_->address().host); + req->set_leader_port(part_->address().port); + req->set_committed_log_id(committedLogId_); + req->set_last_log_term_sent(lastLogTermSent_); + req->set_last_log_id_sent(lastLogIdSent_); req->set_log_term(term); std::vector logs; @@ -420,9 +299,8 @@ std::shared_ptr Host::prepareAppendLogRequest() { logs.emplace_back(std::move(le)); } req->set_log_str_list(std::move(logs)); - req->set_sending_snapshot(false); + return req; } else { - req->set_sending_snapshot(true); if (!sendingSnapshot_) { LOG(INFO) << idStr_ << "Can't find log " << lastLogIdSent_ + 1 << " in wal, send the snapshot" << ", logIdToSend = " << logIdToSend_ @@ -430,21 +308,28 @@ std::shared_ptr Host::prepareAppendLogRequest() { << ", lastLogId in wal = " << part_->wal()->lastLogId(); sendingSnapshot_ = true; part_->snapshot_->sendSnapshot(part_, addr_) - .thenValue([self = shared_from_this()](Status&& status) { + .thenValue([self = shared_from_this()](auto&& status) { + std::lock_guard g(self->lock_); if (status.ok()) { - LOG(INFO) << self->idStr_ << "Send snapshot succeeded!"; + auto commitLogIdAndTerm = status.value(); + self->lastLogIdSent_ = commitLogIdAndTerm.first; + self->lastLogTermSent_ = commitLogIdAndTerm.second; + self->followerCommittedLogId_ = commitLogIdAndTerm.first; + LOG(INFO) << self->idStr_ << "Send snapshot succeeded!" + << " commitLogId = " << commitLogIdAndTerm.first + << " commitLogTerm = " << commitLogIdAndTerm.second; } else { LOG(INFO) << self->idStr_ << "Send snapshot failed!"; // TODO(heng): we should tell the follower i am failed. } self->sendingSnapshot_ = false; + self->noMoreRequestCV_.notify_all(); }); } else { - LOG_EVERY_N(INFO, 30) << idStr_ << "The snapshot req is in queue, please wait for a moment"; + LOG_EVERY_N(INFO, 100) << idStr_ << "The snapshot req is in queue, please wait for a moment"; } + return cpp2::ErrorCode::E_WAITING_SNAPSHOT; } - - return req; } folly::Future Host::sendAppendLogRequest( @@ -466,9 +351,10 @@ folly::Future Host::sendAppendLogRequest( << ", part " << req->get_part() << ", current term " << req->get_current_term() << ", last_log_id " << req->get_last_log_id() << ", committed_id " - << req->get_committed_log_id() << ", last_log_term_sent" + << req->get_committed_log_id() << ", last_log_term_sent " << req->get_last_log_term_sent() << ", last_log_id_sent " - << req->get_last_log_id_sent(); + << req->get_last_log_id_sent() << ", logs in request " + << req->get_log_str_list().size(); // Get client connection auto client = part_->clientMan_->client(addr_, eb, false, FLAGS_raft_rpc_timeout_ms); return client->future_appendLog(*req); @@ -499,7 +385,7 @@ folly::Future Host::sendHeartbeat(folly::EventBase* eb, VLOG(3) << self->idStr_ << "heartbeat call got response"; if (t.hasException()) { cpp2::HeartbeatResponse resp; - resp.set_error_code(cpp2::ErrorCode::E_EXCEPTION); + resp.set_error_code(cpp2::ErrorCode::E_RPC_EXCEPTION); pro.setValue(std::move(resp)); return; } else { @@ -542,5 +428,40 @@ bool Host::noRequest() const { return pendingReq_ == emptyTup; } +std::shared_ptr Host::getPendingReqIfAny(std::shared_ptr self) { + CHECK(!self->lock_.try_lock()); + CHECK(self->requestOnGoing_) << self->idStr_; + + // Check if there are any pending request to send + if (self->noRequest()) { + self->noMoreRequestCV_.notify_all(); + self->requestOnGoing_ = false; + return nullptr; + } + + // there is pending request + auto& tup = self->pendingReq_; + self->logTermToSend_ = std::get<0>(tup); + self->logIdToSend_ = std::get<1>(tup); + self->committedLogId_ = std::get<2>(tup); + + LOG_IF(INFO, FLAGS_trace_raft) << self->idStr_ << "Sending the pending request in the queue" + << ", from " << self->lastLogIdSent_ + 1 << " to " + << self->logIdToSend_; + self->pendingReq_ = std::make_tuple(0, 0, 0); + self->promise_ = std::move(self->cachingPromise_); + self->cachingPromise_ = folly::SharedPromise(); + + auto result = self->prepareAppendLogRequest(); + if (ok(result)) { + return value(result); + } else { + cpp2::AppendLogResponse r; + r.set_error_code(error(result)); + self->setResponse(r); + return nullptr; + } +} + } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/Host.h b/src/kvstore/raftex/Host.h index 3cc23ef0ad8..db52bee54ec 100644 --- a/src/kvstore/raftex/Host.h +++ b/src/kvstore/raftex/Host.h @@ -9,6 +9,7 @@ #include #include "common/base/Base.h" +#include "common/base/ErrorOr.h" #include "common/thrift/ThriftClientManager.h" #include "interface/gen-cpp2/RaftexServiceAsyncClient.h" #include "interface/gen-cpp2/raftex_types.h" @@ -32,18 +33,6 @@ class Host final : public std::enable_shared_from_this { const char* idStr() const { return idStr_.c_str(); } - // This will be called when the shard lost its leadership - void pause() { - std::lock_guard g(lock_); - paused_ = true; - } - - // This will be called when the shard becomes the leader - void resume() { - std::lock_guard g(lock_); - paused_ = false; - } - void stop() { std::lock_guard g(lock_); stopped_ = true; @@ -99,12 +88,14 @@ class Host final : public std::enable_shared_from_this { folly::Future sendHeartbeatRequest( folly::EventBase* eb, std::shared_ptr req); - std::shared_ptr prepareAppendLogRequest(); + ErrorOr> prepareAppendLogRequest(); bool noRequest() const; void setResponse(const cpp2::AppendLogResponse& r); + std::shared_ptr getPendingReqIfAny(std::shared_ptr self); + private: // using Request = std::tuple; @@ -116,10 +107,13 @@ class Host final : public std::enable_shared_from_this { mutable std::mutex lock_; - bool paused_{false}; bool stopped_{false}; + // whether there is a batch of logs for target host in on going bool requestOnGoing_{false}; + // whether there is a snapshot for target host in on going + bool sendingSnapshot_{false}; + std::condition_variable noMoreRequestCV_; folly::SharedPromise promise_; folly::SharedPromise cachingPromise_; @@ -135,7 +129,6 @@ class Host final : public std::enable_shared_from_this { TermID lastLogTermSent_{0}; LogID committedLogId_{0}; - std::atomic_bool sendingSnapshot_{false}; // CommittedLogId of follower LogID followerCommittedLogId_{0}; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 9e2db03be09..99dead9f85b 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -75,6 +75,8 @@ class AppendLogsIterator final : public LogIterator { LogID firstLogId() const { return firstLogId_; } + LogID lastLogId() const { return firstLogId_ + logs_.size() - 1; } + // Return true if the current log is a AtomicOp, otherwise return false bool processAtomicOp() { while (idx_ < logs_.size()) { @@ -305,7 +307,7 @@ void RaftPart::stop() { decltype(hosts_) hosts; { - std::unique_lock lck(raftLock_); + std::lock_guard lck(raftLock_); status_ = Status::STOPPED; leader_ = {"", 0}; role_ = Role::FOLLOWER; @@ -378,11 +380,11 @@ void RaftPart::preProcessTransLeader(const HostAddr& target) { LOG(INFO) << idStr_ << "I will be the new leader, trigger leader election now!"; bgWorkers_->addTask([self = shared_from_this()] { { - std::unique_lock lck(self->raftLock_); + std::lock_guard lck(self->raftLock_); self->role_ = Role::CANDIDATE; self->leader_ = HostAddr("", 0); } - self->leaderElection(); + self->leaderElection().get(); }); } break; @@ -762,7 +764,8 @@ void RaftPart::replicateLogs(folly::EventBase* eb, return; } - VLOG(2) << idStr_ << "About to replicate logs to all peer hosts"; + LOG_IF(INFO, FLAGS_trace_raft) << idStr_ << "About to replicate logs in range [" + << iter.firstLogId() << ", " << lastLogId << "] to all peer hosts"; lastMsgSentDur_.reset(); SlowOpTracker tracker; @@ -973,21 +976,6 @@ bool RaftPart::prepareElectionRequest(cpp2::AskForVoteRequest& req, return false; } - if (UNLIKELY(status_ == Status::STOPPED)) { - VLOG(2) << idStr_ << "The part has been stopped, skip the request"; - return false; - } - - if (UNLIKELY(status_ == Status::STARTING)) { - VLOG(2) << idStr_ << "The partition is still starting"; - return false; - } - - if (UNLIKELY(status_ == Status::WAITING_SNAPSHOT)) { - VLOG(2) << idStr_ << "The partition is still waiting snapshot"; - return false; - } - // Make sure the role is still CANDIDATE if (role_ != Role::CANDIDATE) { VLOG(2) << idStr_ << "A leader has been elected"; @@ -1027,7 +1015,7 @@ typename RaftPart::Role RaftPart::processElectionResponses( } if (UNLIKELY(status_ == Status::WAITING_SNAPSHOT)) { - LOG(INFO) << idStr_ << "The partition is still waitiong snapshot"; + LOG(INFO) << idStr_ << "The partition is still waiting snapshot"; return role_; } @@ -1065,15 +1053,15 @@ typename RaftPart::Role RaftPart::processElectionResponses( return role_; } -bool RaftPart::leaderElection() { +folly::Future RaftPart::leaderElection() { VLOG(2) << idStr_ << "Start leader election..."; using namespace folly; // NOLINT since the fancy overload of | operator bool expected = false; + if (!inElection_.compare_exchange_strong(expected, true)) { - return true; + return false; } - SCOPE_EXIT { inElection_ = false; }; cpp2::AskForVoteRequest voteReq; decltype(hosts_) hosts; @@ -1088,6 +1076,7 @@ bool RaftPart::leaderElection() { // So we neeed to go back to the follower state to avoid the case. std::lock_guard g(raftLock_); role_ = Role::FOLLOWER; + inElection_ = false; return false; } @@ -1102,53 +1091,66 @@ bool RaftPart::leaderElection() { auto proposedTerm = voteReq.get_term(); auto resps = ElectionResponses(); if (hosts.empty()) { - VLOG(2) << idStr_ << "No peer found, I will be the leader"; + auto ret = handleElectionResponses(resps, hosts, proposedTerm); + inElection_ = false; + return ret; } else { + folly::Promise promise; + auto future = promise.getFuture(); auto eb = ioThreadPool_->getEventBase(); - auto futures = collectNSucceeded( - gen::from(hosts) | gen::map([eb, self = shared_from_this(), &voteReq](auto& host) { - VLOG(2) << self->idStr_ << "Sending AskForVoteRequest to " << host->idStr(); - return via(eb, [&voteReq, &host, eb]() -> Future { - return host->askForVote(voteReq, eb); - }); - }) | gen::as(), + collectNSucceeded( + gen::from(hosts) | + gen::map([eb, self = shared_from_this(), voteReq](std::shared_ptr host) { + VLOG(2) << self->idStr_ << "Sending AskForVoteRequest to " << host->idStr(); + return via(eb, [voteReq, host, eb]() -> Future { + return host->askForVote(voteReq, eb); + }); + }) | + gen::as(), // Number of succeeded required quorum_, // Result evaluator [hosts](size_t idx, cpp2::AskForVoteResponse& resp) { return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED && !hosts[idx]->isLearner(); + }) + .via(executor_.get()) + .then([self = shared_from_this(), pro = std::move(promise), hosts, proposedTerm]( + auto&& t) mutable { + VLOG(2) << self->idStr_ + << "AskForVoteRequest has been sent to all peers, waiting for responses"; + CHECK(!t.hasException()); + pro.setValue(self->handleElectionResponses(t.value(), std::move(hosts), proposedTerm)); + self->inElection_ = false; }); - - VLOG(2) << idStr_ - << "AskForVoteRequest has been sent to all peers" - ", waiting for responses"; - futures.wait(); - CHECK(!futures.hasException()) - << "Got exception -- " << futures.result().exception().what().toStdString(); - VLOG(2) << idStr_ << "Got AskForVote response back"; - - resps = std::move(futures).get(); + return future; } +} +bool RaftPart::handleElectionResponses(const ElectionResponses& resps, + const std::vector>& peers, + TermID proposedTerm) { // Process the responses - switch (processElectionResponses(resps, std::move(hosts), proposedTerm)) { + switch (processElectionResponses(resps, std::move(peers), proposedTerm)) { case Role::LEADER: { // Elected LOG(INFO) << idStr_ << "The partition is elected as the leader"; + std::vector> hosts; { std::lock_guard g(raftLock_); if (status_ == Status::RUNNING) { leader_ = addr_; - for (auto& host : hosts_) { - host->reset(); - } + hosts = hosts_; bgWorkers_->addTask( - [self = shared_from_this(), term = voteReq.get_term()] { self->onElected(term); }); + [self = shared_from_this(), proposedTerm] { self->onElected(proposedTerm); }); lastMsgAcceptedTime_ = 0; } weight_ = 1; commitInThisTerm_ = false; } + // reset host can't be executed with raftLock_, otherwise it may encounter deadlock + for (auto& host : hosts) { + host->reset(); + } sendHeartbeat(); return true; } @@ -1184,7 +1186,7 @@ void RaftPart::statusPolling(int64_t startTime) { } size_t delay = FLAGS_raft_heartbeat_interval_secs * 1000 / 3; if (needToStartElection()) { - if (leaderElection()) { + if (leaderElection().get()) { VLOG(2) << idStr_ << "Stop the election"; } else { // No leader has been elected, need to continue @@ -1197,7 +1199,6 @@ void RaftPart::statusPolling(int64_t startTime) { sendHeartbeat(); } if (needToCleanupSnapshot()) { - LOG(INFO) << idStr_ << "Clean up the snapshot"; cleanupSnapshot(); } { @@ -1261,7 +1262,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, if (UNLIKELY(status_ == Status::WAITING_SNAPSHOT)) { LOG(INFO) << idStr_ << "The partition is still waiting snapshot"; - resp.set_error_code(cpp2::ErrorCode::E_NOT_READY); + resp.set_error_code(cpp2::ErrorCode::E_WAITING_SNAPSHOT); return; } @@ -1331,7 +1332,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, // Before change role from leader to follower, check the logs locally. if (role_ == Role::LEADER && wal_->lastLogId() > lastLogId_) { - LOG(INFO) << idStr_ << "There is one log " << wal_->lastLogId() + LOG(INFO) << idStr_ << "There are some logs up to " << wal_->lastLogId() << " i did not commit when i was leader, rollback to " << lastLogId_; wal_->rollbackToLog(lastLogId_); } @@ -1364,11 +1365,11 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, << ", lastLogTermSent = " << req.get_last_log_term_sent() << ", num_logs = " << req.get_log_str_list().size() << ", logTerm = " << req.get_log_term() - << ", sendingSnapshot = " << req.get_sending_snapshot() << ", local lastLogId = " << lastLogId_ << ", local lastLogTerm = " << lastLogTerm_ << ", local committedLogId = " << committedLogId_ - << ", local current term = " << term_; + << ", local current term = " << term_ + << ", wal lastLogId = " << wal_->lastLogId(); std::lock_guard g(raftLock_); resp.set_current_term(term_); @@ -1389,6 +1390,11 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, resp.set_error_code(cpp2::ErrorCode::E_NOT_READY); return; } + if (UNLIKELY(status_ == Status::WAITING_SNAPSHOT)) { + VLOG(2) << idStr_ << "The partition is waiting for snapshot"; + resp.set_error_code(cpp2::ErrorCode::E_WAITING_SNAPSHOT); + return; + } // Check leadership cpp2::ErrorCode err = verifyLeader(req); if (err != cpp2::ErrorCode::SUCCEEDED) { @@ -1401,54 +1407,6 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, // Reset the timeout timer lastMsgRecvDur_.reset(); - if (req.get_sending_snapshot() && status_ != Status::WAITING_SNAPSHOT) { - LOG(INFO) << idStr_ << "Begin to wait for the snapshot" - << " " << req.get_committed_log_id(); - reset(); - status_ = Status::WAITING_SNAPSHOT; - resp.set_error_code(cpp2::ErrorCode::E_WAITING_SNAPSHOT); - return; - } - - if (UNLIKELY(status_ == Status::WAITING_SNAPSHOT)) { - VLOG(2) << idStr_ << "The part is receiving snapshot," - << "so just accept the new wals, but don't commit them." - << "last_log_id_sent " << req.get_last_log_id_sent() << ", total log number " - << req.get_log_str_list().size(); - if (lastLogId_ > 0 && req.get_last_log_id_sent() > lastLogId_) { - // There is a gap - LOG(INFO) << idStr_ << "Local is missing logs from id " << lastLogId_ << ". Need to catch up"; - resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); - return; - } - // TODO(heng): if we have 3 node, one is leader, one is wait snapshot and - // return success, the other is follower, but leader replica log to follow - // failed, How to deal with leader crash? At this time, no leader will be - // elected. - size_t numLogs = req.get_log_str_list().size(); - LogID firstId = req.get_last_log_id_sent() + 1; - - VLOG(2) << idStr_ << "Writing log [" << firstId << ", " << firstId + numLogs - 1 << "] to WAL"; - LogStrListIterator iter(firstId, req.get_log_term(), req.get_log_str_list()); - if (wal_->appendLogs(iter)) { - // When leader has been sending a snapshot already, sometimes it would - // send a request with empty log list, and lastLogId in wal may be 0 - // because of reset. - if (numLogs != 0) { - CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId()) << "First Id is " << firstId; - } - lastLogId_ = wal_->lastLogId(); - lastLogTerm_ = wal_->lastLogTerm(); - resp.set_last_log_id(lastLogId_); - resp.set_last_log_term(lastLogTerm_); - resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); - } else { - LOG_EVERY_N(WARNING, 100) << idStr_ << "Failed to append logs to WAL"; - resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); - } - return; - } - if (req.get_last_log_id_sent() < committedLogId_ && req.get_last_log_term_sent() <= term_) { LOG(INFO) << idStr_ << "Stale log! The log " << req.get_last_log_id_sent() << ", term " << req.get_last_log_term_sent() << " i had committed yet. My committedLogId is " @@ -1460,7 +1418,7 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, << ", the log term is " << req.get_last_log_term_sent() << ", but my committedLogId is " << committedLogId_ << ", my term is " << term_ << ", to make the cluster stable i will follow the high term" - << " candidate and clenaup my data"; + << " candidate and cleanup my data"; reset(); resp.set_committed_log_id(committedLogId_); resp.set_last_log_id(lastLogId_); @@ -1469,64 +1427,101 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, } // req.get_last_log_id_sent() >= committedLogId_ - if (lastLogTerm_ > 0 && req.get_last_log_term_sent() != lastLogTerm_) { - LOG(INFO) << idStr_ << "The local last log term is " << lastLogTerm_ - << ", which is different from the leader's prevLogTerm " - << req.get_last_log_term_sent() << ", the prevLogId is " << req.get_last_log_id_sent() - << ". So need to rollback to last committedLogId_ " << committedLogId_; - if (wal_->rollbackToLog(committedLogId_)) { - lastLogId_ = wal_->lastLogId(); - lastLogTerm_ = wal_->lastLogTerm(); - resp.set_last_log_id(lastLogId_); - resp.set_last_log_term(lastLogTerm_); - LOG(INFO) << idStr_ << "Rollback succeeded! lastLogId is " << lastLogId_ << ", logLogTerm is " - << lastLogTerm_ << ", committedLogId is " << committedLogId_ << ", term is " - << term_; - } - resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); - return; + if (req.get_last_log_id_sent() == lastLogId_ && req.get_last_log_term_sent() == lastLogTerm_) { + // nothing to do + // just append log later } else if (req.get_last_log_id_sent() > lastLogId_) { // There is a gap LOG(INFO) << idStr_ << "Local is missing logs from id " << lastLogId_ << ". Need to catch up"; resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); return; - } else if (req.get_last_log_id_sent() < lastLogId_) { - // TODO(doodle): This is a potential bug which would cause data not in - // consensus. In most case, we would hit this path when leader append logs - // to follower and timeout (leader would set lastLogIdSent_ = logIdToSend_ - - // 1 in Host). **But follower actually received it successfully**. Which - // will explain when leader retry to append these logs, the LOG belows is - // printed, and lastLogId_ == req.get_last_log_id_sent() + 1 in the LOG. - // - // In fact we should always rollback to req.get_last_log_id_sent(), and - // append the logs from leader (we can't make promise that the logs in range - // [req.get_last_log_id_sent() + 1, lastLogId_] is same with follower). - // However, this makes no difference in the above case. - LOG(INFO) << idStr_ << "Stale log! Local lastLogId " << lastLogId_ << ", lastLogTerm " - << lastLogTerm_ << ", lastLogIdSent " << req.get_last_log_id_sent() - << ", lastLogTermSent " << req.get_last_log_term_sent(); - resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE); - return; + } else { + // check the last log term is matched or not + int reqLastLogTerm = wal_->getLogTerm(req.get_last_log_id_sent()); + if (req.get_last_log_term_sent() != reqLastLogTerm) { + LOG(INFO) << idStr_ << "The local log term is " << reqLastLogTerm + << ", which is different from the leader's prevLogTerm " + << req.get_last_log_term_sent() << ", the prevLogId is " + << req.get_last_log_id_sent() << ". So ask leader to send logs from committedLogId " + << committedLogId_; + TermID committedLogTerm = wal_->getLogTerm(committedLogId_); + if (committedLogTerm > 0) { + resp.set_last_log_id(committedLogId_); + resp.set_last_log_term(committedLogTerm); + } + resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); + return; + } } - // Append new logs + // request get_last_log_term_sent == wal[get_last_log_id_sent].log_term size_t numLogs = req.get_log_str_list().size(); LogID firstId = req.get_last_log_id_sent() + 1; - VLOG(2) << idStr_ << "Writing log [" << firstId << ", " << firstId + numLogs - 1 << "] to WAL"; - LogStrListIterator iter(firstId, req.get_log_term(), req.get_log_str_list()); - if (wal_->appendLogs(iter)) { - if (numLogs != 0) { - CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId()) << "First Id is " << firstId; + + size_t diffIndex = 0; + do { + // find the first id/term not match, rollback until it, and append the remaining wal + if (!(req.get_last_log_id_sent() == lastLogId_ && + req.get_last_log_term_sent() == lastLogTerm_)) { + // check the diff index in log, find the first log which term is not same as term in request + { + std::unique_ptr it = wal_->iterator(firstId, firstId + numLogs - 1); + for (size_t i = 0; i < numLogs && it->valid(); i++, ++(*it), diffIndex++) { + int logTerm = it->logTerm(); + if (req.get_log_term() != logTerm) { + break; + } + } + } + + // stale log + if (diffIndex == numLogs) { + // All logs have been received before + resp.set_last_log_id(firstId + numLogs - 1); + resp.set_last_log_term(req.get_log_term()); + // nothing to append, goto commit + break; + } + + // rollback the wal + if (wal_->rollbackToLog(firstId + diffIndex - 1)) { + lastLogId_ = wal_->lastLogId(); + lastLogTerm_ = wal_->lastLogTerm(); + LOG(INFO) << idStr_ << "Rollback succeeded! lastLogId is " << lastLogId_ + << ", logLogTerm is " << lastLogTerm_ << ", committedLogId is " << committedLogId_ + << ", logs in request " << numLogs << ", remaining logs after rollback " + << numLogs - diffIndex; + } else { + LOG(ERROR) << idStr_ << "Rollback fail! lastLogId is" << lastLogId_ << ", logLogTerm is " + << lastLogTerm_ << ", committedLogId is " << committedLogId_ + << ", rollback id is " << firstId + diffIndex - 1; + resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); + return; + } + + // update msg + firstId = firstId + diffIndex; + numLogs = numLogs - diffIndex; } - lastLogId_ = wal_->lastLogId(); - lastLogTerm_ = wal_->lastLogTerm(); - resp.set_last_log_id(lastLogId_); - resp.set_last_log_term(lastLogTerm_); - } else { - LOG_EVERY_N(WARNING, 100) << idStr_ << "Failed to append logs to WAL"; - resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); - return; - } + + // Append new logs + std::vector logEntries = std::vector( + std::make_move_iterator(req.get_log_str_list().begin() + diffIndex), + std::make_move_iterator(req.get_log_str_list().end())); + LogStrListIterator iter(firstId, req.get_log_term(), std::move(logEntries)); + if (wal_->appendLogs(iter)) { + if (numLogs != 0) { + CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId()) << "First Id is " << firstId; + } + lastLogId_ = wal_->lastLogId(); + lastLogTerm_ = wal_->lastLogTerm(); + resp.set_last_log_id(lastLogId_); + resp.set_last_log_term(lastLogTerm_); + } else { + resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); + return; + } + } while (false); LogID lastLogIdCanCommit = std::min(lastLogId_, req.get_committed_log_id()); if (lastLogIdCanCommit > committedLogId_) { @@ -1738,14 +1733,12 @@ void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req, } if (req.get_done()) { committedLogId_ = req.get_committed_log_id(); - if (lastLogId_ < committedLogId_) { - lastLogId_ = committedLogId_; - lastLogTerm_ = req.get_committed_log_term(); - } - if (wal_->lastLogId() <= committedLogId_) { - LOG(INFO) << idStr_ << "Reset invalid wal after snapshot received"; - wal_->reset(); - } + lastLogId_ = committedLogId_; + lastLogTerm_ = req.get_committed_log_term(); + term_ = proposedTerm_ = lastLogTerm_; + // there should be no wal after state converts to WAITING_SNAPSHOT, the RaftPart has been reset + DCHECK_EQ(wal_->firstLogId(), 0); + DCHECK_EQ(wal_->lastLogId(), 0); status_ = Status::RUNNING; LOG(INFO) << idStr_ << "Receive all snapshot, committedLogId_ " << committedLogId_ << ", lastLodId " << lastLogId_ << ", lastLogTermId " << lastLogTerm_; diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 437afdbaead..6454ee96d9b 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -344,8 +344,9 @@ class RaftPart : public std::enable_shared_from_this { void cleanupSnapshot(); // The method sends out AskForVote request - // It return true if a leader is elected, otherwise returns false - bool leaderElection(); + // Return true if a leader is elected (the leader could be self or others), + // otherwise returns false + folly::Future leaderElection(); // The method will fill up the request object and return TRUE // if the election should continue. Otherwise the method will @@ -353,6 +354,11 @@ class RaftPart : public std::enable_shared_from_this { bool prepareElectionRequest(cpp2::AskForVoteRequest& req, std::vector>& hosts); + // return true if elected as the leader, else return false + bool handleElectionResponses(const ElectionResponses& resps, + const std::vector>& hosts, + TermID proposedTerm); + // The method returns the partition's role after the election Role processElectionResponses(const ElectionResponses& results, std::vector> hosts, diff --git a/src/kvstore/raftex/SnapshotManager.cpp b/src/kvstore/raftex/SnapshotManager.cpp index 20f1f31715e..b6a3adc268f 100644 --- a/src/kvstore/raftex/SnapshotManager.cpp +++ b/src/kvstore/raftex/SnapshotManager.cpp @@ -26,9 +26,9 @@ SnapshotManager::SnapshotManager() { std::make_shared("snapshot-ioexecutor"))); } -folly::Future SnapshotManager::sendSnapshot(std::shared_ptr part, - const HostAddr& dst) { - folly::Promise p; +folly::Future>> SnapshotManager::sendSnapshot( + std::shared_ptr part, const HostAddr& dst) { + folly::Promise>> p; auto fut = p.getFuture(); executor_->add([this, p = std::move(p), part, dst]() mutable { auto spaceId = part->spaceId_; @@ -40,7 +40,7 @@ folly::Future SnapshotManager::sendSnapshot(std::shared_ptr pa auto commitLogIdAndTerm = part->lastCommittedLogId(); const auto& localhost = part->address(); std::vector> results; - LOG(INFO) << part->idStr_ << "Begin to send the snapshot" + LOG(INFO) << part->idStr_ << "Begin to send the snapshot to the host " << dst << ", commitLogId = " << commitLogIdAndTerm.first << ", commitLogTerm = " << commitLogIdAndTerm.second; accessAllRowsInSnapshot( @@ -77,7 +77,7 @@ folly::Future SnapshotManager::sendSnapshot(std::shared_ptr pa if (status == SnapshotStatus::DONE) { LOG(INFO) << part->idStr_ << "Finished, totalCount " << totalCount << ", totalSize " << totalSize; - p.setValue(Status::OK()); + p.setValue(commitLogIdAndTerm); } return true; } else { @@ -90,6 +90,7 @@ folly::Future SnapshotManager::sendSnapshot(std::shared_ptr pa } catch (const std::exception& e) { LOG(ERROR) << part->idStr_ << "Send snapshot failed, exception " << e.what() << ", retry " << retry << " times"; + sleep(1); continue; } } diff --git a/src/kvstore/raftex/SnapshotManager.h b/src/kvstore/raftex/SnapshotManager.h index a7c40ac3527..de613caaa6f 100644 --- a/src/kvstore/raftex/SnapshotManager.h +++ b/src/kvstore/raftex/SnapshotManager.h @@ -38,7 +38,8 @@ class SnapshotManager { virtual ~SnapshotManager() = default; // Send snapshot for spaceId, partId to host dst. - folly::Future sendSnapshot(std::shared_ptr part, const HostAddr& dst); + folly::Future>> sendSnapshot(std::shared_ptr part, + const HostAddr& dst); private: folly::Future send(GraphSpaceID spaceId, diff --git a/src/kvstore/raftex/test/CMakeLists.txt b/src/kvstore/raftex/test/CMakeLists.txt index c40b29720c3..d5368fae5fa 100644 --- a/src/kvstore/raftex/test/CMakeLists.txt +++ b/src/kvstore/raftex/test/CMakeLists.txt @@ -124,21 +124,6 @@ nebula_add_test( gtest ) -nebula_add_test( - NAME - snapshot_test - SOURCES - SnapshotTest.cpp - RaftexTestBase.cpp - TestShard.cpp - OBJECTS - ${RAFTEX_TEST_LIBS} - LIBRARIES - ${THRIFT_LIBRARIES} - wangle - gtest -) - nebula_add_test( NAME member_change_test diff --git a/src/kvstore/raftex/test/SnapshotTest.cpp b/src/kvstore/raftex/test/SnapshotTest.cpp deleted file mode 100644 index 9ff5a6eb656..00000000000 --- a/src/kvstore/raftex/test/SnapshotTest.cpp +++ /dev/null @@ -1,100 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include -#include - -#include "common/base/Base.h" -#include "common/fs/FileUtils.h" -#include "common/fs/TempDir.h" -#include "common/network/NetworkUtils.h" -#include "common/thread/GenericThreadPool.h" -#include "kvstore/raftex/RaftexService.h" -#include "kvstore/raftex/test/RaftexTestBase.h" -#include "kvstore/raftex/test/TestShard.h" - -DECLARE_uint32(raft_heartbeat_interval_secs); -DECLARE_int32(wal_ttl); -DECLARE_int64(wal_file_size); -DECLARE_int32(wal_buffer_size); -DECLARE_int32(wal_buffer_num); -DECLARE_int32(raft_rpc_timeout_ms); - -namespace nebula { -namespace raftex { - -TEST(SnapshotTest, LearnerCatchUpDataTest) { - fs::TempDir walRoot("/tmp/catch_up_data.XXXXXX"); - FLAGS_wal_file_size = 1024; - FLAGS_wal_buffer_size = 512; - FLAGS_raft_rpc_timeout_ms = 2000; - std::shared_ptr workers; - std::vector wals; - std::vector allHosts; - std::vector> services; - std::vector> copies; - - std::shared_ptr leader; - std::vector isLearner = {false, false, false, true}; - setupRaft(4, walRoot, workers, wals, allHosts, services, copies, leader, isLearner); - - // Check all hosts agree on the same leader - checkLeadership(copies, leader); - - std::vector msgs; - for (int i = 0; i < 10; i++) { - appendLogs(i * 100, i * 100 + 99, leader, msgs, true); - } - // Sleep a while to make sure the last log has been committed on followers - sleep(FLAGS_raft_heartbeat_interval_secs); - - // Check every copy - for (int i = 0; i < 3; i++) { - ASSERT_EQ(1000, copies[i]->getNumLogs()); - } - - for (int i = 0; i < 1000; ++i) { - for (int j = 0; j < 3; j++) { - folly::StringPiece msg; - ASSERT_TRUE(copies[j]->getLogMsg(i, msg)); - ASSERT_EQ(msgs[i], msg.toString()); - } - } - // wait for the wal to be cleaned - FLAGS_wal_ttl = 1; - sleep(FLAGS_wal_ttl + 3); - FLAGS_wal_ttl = 60; - LOG(INFO) << "Add learner, we need to catch up data!"; - auto f = leader->sendCommandAsync(test::encodeLearner(allHosts[3])); - f.wait(); - - LOG(INFO) << "Let's continue to write some logs"; - for (int i = 10; i < 20; i++) { - appendLogs(i * 100, i * 100 + 99, leader, msgs, true); - } - sleep(FLAGS_raft_heartbeat_interval_secs); - - auto& learner = copies[3]; - ASSERT_EQ(2000, learner->getNumLogs()); - for (int i = 0; i < 2000; ++i) { - folly::StringPiece msg; - ASSERT_TRUE(learner->getLogMsg(i, msg)); - ASSERT_EQ(msgs[i], msg.toString()); - } - - LOG(INFO) << "Finished UT"; - finishRaft(services, copies, workers, leader); -} - -} // namespace raftex -} // namespace nebula - -int main(int argc, char** argv) { - testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - - return RUN_ALL_TESTS(); -} diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index ac6af23f6dc..caecf65de55 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -353,7 +353,6 @@ void FileBasedWal::rollbackInFile(WalFileInfoPtr info, LogID logId) { } lastLogId_ = logId; lastLogTerm_ = term; - LOG(INFO) << idStr_ << "Rollback to log " << logId; CHECK_GT(pos, 0) << "This wal should have been deleted"; if (pos < FileUtils::fileSize(path)) { @@ -610,6 +609,8 @@ bool FileBasedWal::rollbackToLog(LogID id) { VLOG(1) << "Roll back to log " << id << ", the last WAL file is now \"" << walFiles_.rbegin()->second->path() << "\""; rollbackInFile(walFiles_.rbegin()->second, id); + CHECK_EQ(lastLogId_, id); + CHECK_EQ(walFiles_.rbegin()->second->lastId(), id); } } @@ -631,7 +632,7 @@ bool FileBasedWal::reset() { std::vector files = FileUtils::listAllFilesInDir(dir_.c_str(), false, "*.wal"); for (auto& fn : files) { auto absFn = FileUtils::joinPath(dir_, fn); - LOG(INFO) << "Removing " << absFn; + VLOG(1) << "Removing " << absFn; unlink(absFn.c_str()); } lastLogId_ = firstLogId_ = 0; @@ -714,5 +715,14 @@ size_t FileBasedWal::accessAllWalInfo(std::function f return count; } +TermID FileBasedWal::getLogTerm(LogID id) { + TermID term = -1; + auto iter = iterator(id, id); + if (iter->valid()) { + term = iter->logTerm(); + } + return term; +} + } // namespace wal } // namespace nebula diff --git a/src/kvstore/wal/FileBasedWal.h b/src/kvstore/wal/FileBasedWal.h index f10cb52d989..57d9439a5d3 100644 --- a/src/kvstore/wal/FileBasedWal.h +++ b/src/kvstore/wal/FileBasedWal.h @@ -72,6 +72,9 @@ class FileBasedWal final : public Wal, public std::enable_shared_from_this wal, LogID startI } if (startId < wal_->firstLogId()) { - LOG(ERROR) << wal_->idStr_ << "The given log id " << startId - << " is out of the range, the wal firstLogId is " << wal_->firstLogId(); + VLOG(1) << wal_->idStr_ << "The given log id " << startId + << " is out of the range, the wal firstLogId is " << wal_->firstLogId(); currId_ = lastId_ + 1; return; } diff --git a/src/kvstore/wal/test/FileBasedWalTest.cpp b/src/kvstore/wal/test/FileBasedWalTest.cpp index 55b4004561c..a2a6a8a45f5 100644 --- a/src/kvstore/wal/test/FileBasedWalTest.cpp +++ b/src/kvstore/wal/test/FileBasedWalTest.cpp @@ -568,6 +568,40 @@ TEST(FileBasedWal, CleanWalBeforeIdTest) { CHECK_EQ(1000, wal->lastLogId()); } +TEST(FileBasedWal, getLogTermTest) { + TempDir walDir("/tmp/testWal.XXXXXX"); + FileBasedWalInfo info; + FileBasedWalPolicy policy; + policy.fileSize = 1024L * 1024L; + policy.bufferSize = 1024L * 1024L; + + auto wal = FileBasedWal::getWal( + walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); + + // Append > 10MB logs in total + for (int i = 1; i <= 10000; i++) { + ASSERT_TRUE( + wal->appendLog(i /*id*/, i /*term*/, 0 /*cluster*/, folly::stringPrintf(kLongMsg, i))); + } + + // in the memory buffer + ASSERT_EQ(10000, wal->getLogTerm(10000)); + // in the file + ASSERT_EQ(4, wal->getLogTerm(4)); + + // Close the wal + wal.reset(); + + // Now let's open it to read + wal = FileBasedWal::getWal( + walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); + EXPECT_EQ(10, wal->getLogTerm(10)); +} + } // namespace wal } // namespace nebula diff --git a/tests/admin/test_configs.py b/tests/admin/test_configs.py index 9c87734c9f6..a6a7213b3ed 100644 --- a/tests/admin/test_configs.py +++ b/tests/admin/test_configs.py @@ -60,7 +60,7 @@ def test_configs(self): expected_result = [ ['GRAPH', 'v', 'int', 'MUTABLE', v], ['GRAPH', 'minloglevel', 'int', 'MUTABLE', 0], - ['GRAPH', 'slow_op_threshhold_ms', 'int', 'MUTABLE', 50], + ['GRAPH', 'slow_op_threshhold_ms', 'int', 'MUTABLE', 100], ['GRAPH', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1], ['GRAPH', 'meta_client_retry_times', 'int', 'MUTABLE', 3], ['GRAPH', 'accept_partial_success', 'bool', 'MUTABLE', False], @@ -80,7 +80,7 @@ def test_configs(self): ['STORAGE', 'wal_ttl', 'int', 'MUTABLE', 14400], ['STORAGE', 'minloglevel', 'int', 'MUTABLE', 0], ['STORAGE', 'custom_filter_interval_secs', 'int', 'MUTABLE', 86400], - ['STORAGE', 'slow_op_threshhold_ms', 'int', 'MUTABLE', 50], + ['STORAGE', 'slow_op_threshhold_ms', 'int', 'MUTABLE', 100], ['STORAGE', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1], ['STORAGE', 'meta_client_retry_times', 'int', 'MUTABLE', 3], ['STORAGE', 'rocksdb_db_options', 'map', 'MUTABLE', {}],