Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update listener #4925

Merged
merged 4 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 0 additions & 117 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include "kvstore/LogEncoder.h"

DEFINE_int32(listener_commit_interval_secs, 1, "Listener commit interval");
DEFINE_int32(listener_commit_batch_size, 1000, "Max batch size when listener commit");
DEFINE_uint32(ft_request_retry_times, 3, "Retry times if fulltext request failed");
DEFINE_int32(ft_bulk_batch_size, 100, "Max batch size when bulk insert");
DEFINE_int32(listener_pursue_leader_threshold, 1000, "Catch up with the leader's threshold");
Expand Down Expand Up @@ -161,123 +160,7 @@ void Listener::doApply() {
});
}

void Listener::processLogs() {
std::unique_ptr<LogIterator> iter;
{
std::lock_guard<std::mutex> guard(raftLock_);
if (lastApplyLogId_ >= committedLogId_) {
return;
}
iter = wal_->iterator(lastApplyLogId_ + 1, committedLogId_);
}

LogID lastApplyId = -1;
// the kv pair which can sync to remote safely
std::vector<KV> data;
while (iter->valid()) {
lastApplyId = iter->logId();

auto log = iter->logMsg();
if (log.empty()) {
// skip the heartbeat
++(*iter);
continue;
}

DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t));
switch (log[sizeof(int64_t)]) {
case OP_PUT: {
auto pieces = decodeMultiValues(log);
DCHECK_EQ(2, pieces.size());
data.emplace_back(pieces[0], pieces[1]);
break;
}
case OP_MULTI_PUT: {
auto kvs = decodeMultiValues(log);
DCHECK_EQ(0, kvs.size() % 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
data.emplace_back(kvs[i], kvs[i + 1]);
}
break;
}
case OP_REMOVE:
case OP_REMOVE_RANGE:
case OP_MULTI_REMOVE: {
break;
}
case OP_BATCH_WRITE: {
auto batch = decodeBatchValue(log);
for (auto& op : batch) {
// OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored
if (op.first == BatchLogType::OP_BATCH_PUT) {
data.emplace_back(op.second.first, op.second.second);
}
}
break;
}
case OP_TRANS_LEADER:
case OP_ADD_LEARNER:
case OP_ADD_PEER:
case OP_REMOVE_PEER: {
break;
}
default: {
VLOG(2) << idStr_ << "Unknown operation: " << static_cast<int32_t>(log[0]);
}
}

if (static_cast<int32_t>(data.size()) > FLAGS_listener_commit_batch_size) {
break;
}
++(*iter);
}

// apply to state machine
if (lastApplyId != -1 && apply(data)) {
std::lock_guard<std::mutex> guard(raftLock_);
lastApplyLogId_ = lastApplyId;
persist(committedLogId_, term_, lastApplyLogId_);
VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
}
}

std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> Listener::commitSnapshot(
const std::vector<std::string>& rows,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
VLOG(2) << idStr_ << "Listener is committing snapshot.";
int64_t count = 0;
int64_t size = 0;
std::vector<KV> data;
data.reserve(rows.size());
for (const auto& row : rows) {
count++;
size += row.size();
auto kv = decodeKV(row);
data.emplace_back(kv.first, kv.second);
}
if (!apply(data)) {
LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot.";
return {
nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED, kNoSnapshotCount, kNoSnapshotSize};
}
if (finished) {
CHECK(!raftLock_.try_lock());
leaderCommitId_ = committedLogId;
lastApplyLogId_ = committedLogId;
persist(committedLogId, committedLogTerm, lastApplyLogId_);
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
LOG(INFO) << folly::sformat(
"Commit snapshot to : committedLogId={},"
"committedLogTerm={}, lastApplyLogId_={}",
committedLogId,
committedLogTerm,
lastApplyLogId_);
}
return {nebula::cpp2::ErrorCode::SUCCEEDED, count, size};
}

void Listener::resetListener() {
std::lock_guard<std::mutex> g(raftLock_);
Expand Down
29 changes: 2 additions & 27 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "common/base/Base.h"
#include "common/meta/SchemaManager.h"
#include "kvstore/Common.h"
#include "kvstore/LogEncoder.h"
#include "kvstore/raftex/Host.h"
#include "kvstore/raftex/RaftPart.h"
#include "kvstore/wal/FileBasedWal.h"
Expand Down Expand Up @@ -180,14 +181,6 @@ class Listener : public raftex::RaftPart {
*/
virtual LogID lastApplyLogId() = 0;

/**
* @brief Apply data into listener's state machine
*
* @param data Key/value to apply
* @return True if succeed. False if failed.
*/
virtual bool apply(const std::vector<KV>& data) = 0;

/**
* @brief Persist commitLogId commitLogTerm and lastApplyLogId
*/
Expand Down Expand Up @@ -272,31 +265,13 @@ class Listener : public raftex::RaftPart {
ClusterID clusterId,
folly::StringPiece log) override;

/**
* @brief If the listener falls behind way to much than leader, the leader will send all its data
* in snapshot by batch, listener need to implement this method to apply the batch to state
* machine. The return value is a pair of <logs count, logs size> of this batch.
*
* @param data Data to apply
* @param committedLogId Commit log id of snapshot
* @param committedLogTerm Commit log term of snapshot
* @param finished Whether spapshot is finished
* @return std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> Return {ok, count, size} if
* succeed, else return {errorcode, -1, -1}
*/
std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> commitSnapshot(
const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override;

/**
* @brief Background job thread will trigger doApply to apply data into state machine periodically
*/
void doApply();

// Process logs and then call apply to execute
virtual void processLogs();
virtual void processLogs() = 0;

protected:
LogID leaderCommitId_ = 0;
Expand Down
118 changes: 118 additions & 0 deletions src/kvstore/plugins/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

DECLARE_uint32(ft_request_retry_times);
DECLARE_int32(ft_bulk_batch_size);
DEFINE_int32(listener_commit_batch_size, 1000, "Max batch size when listener commit");

namespace nebula {
namespace kvstore {
Expand Down Expand Up @@ -244,5 +245,122 @@ bool ESListener::writeDatum(const std::vector<nebula::plugin::DocItem>& items) c
return true;
}

void ESListener::processLogs() {
std::unique_ptr<LogIterator> iter;
{
std::lock_guard<std::mutex> guard(raftLock_);
if (lastApplyLogId_ >= committedLogId_) {
return;
}
iter = wal_->iterator(lastApplyLogId_ + 1, committedLogId_);
}

LogID lastApplyId = -1;
// the kv pair which can sync to remote safely
std::vector<KV> data;
while (iter->valid()) {
lastApplyId = iter->logId();

auto log = iter->logMsg();
if (log.empty()) {
// skip the heartbeat
++(*iter);
continue;
}

DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t));
switch (log[sizeof(int64_t)]) {
case OP_PUT: {
auto pieces = decodeMultiValues(log);
DCHECK_EQ(2, pieces.size());
data.emplace_back(pieces[0], pieces[1]);
break;
}
case OP_MULTI_PUT: {
auto kvs = decodeMultiValues(log);
DCHECK_EQ(0, kvs.size() % 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
data.emplace_back(kvs[i], kvs[i + 1]);
}
break;
}
case OP_REMOVE:
case OP_REMOVE_RANGE:
case OP_MULTI_REMOVE: {
break;
}
case OP_BATCH_WRITE: {
auto batch = decodeBatchValue(log);
for (auto& op : batch) {
// OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored
if (op.first == BatchLogType::OP_BATCH_PUT) {
data.emplace_back(op.second.first, op.second.second);
}
}
break;
}
case OP_TRANS_LEADER:
case OP_ADD_LEARNER:
case OP_ADD_PEER:
case OP_REMOVE_PEER: {
break;
}
default: {
VLOG(2) << idStr_ << "Unknown operation: " << static_cast<int32_t>(log[0]);
}
}

if (static_cast<int32_t>(data.size()) > FLAGS_listener_commit_batch_size) {
break;
}
++(*iter);
}
// apply to state machine
if (lastApplyId != -1 && apply(data)) {
std::lock_guard<std::mutex> guard(raftLock_);
lastApplyLogId_ = lastApplyId;
persist(committedLogId_, term_, lastApplyLogId_);
VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
}
}

std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> ESListener::commitSnapshot(
const std::vector<std::string>& rows,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
VLOG(2) << idStr_ << "Listener is committing snapshot.";
int64_t count = 0;
int64_t size = 0;
std::vector<KV> data;
data.reserve(rows.size());
for (const auto& row : rows) {
count++;
size += row.size();
auto kv = decodeKV(row);
data.emplace_back(kv.first, kv.second);
}
if (!apply(data)) {
LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot.";
return {
nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED, kNoSnapshotCount, kNoSnapshotSize};
}
if (finished) {
CHECK(!raftLock_.try_lock());
leaderCommitId_ = committedLogId;
lastApplyLogId_ = committedLogId;
persist(committedLogId, committedLogTerm, lastApplyLogId_);
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
LOG(INFO) << folly::sformat(
"Commit snapshot to : committedLogId={},"
"committedLogTerm={}, lastApplyLogId_={}",
committedLogId,
committedLogTerm,
lastApplyLogId_);
}
return {nebula::cpp2::ErrorCode::SUCCEEDED, count, size};
}

} // namespace kvstore
} // namespace nebula
10 changes: 9 additions & 1 deletion src/kvstore/plugins/elasticsearch/ESListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ESListener : public Listener {
* @param data Key/value to apply
* @return True if succeed. False if failed.
*/
bool apply(const std::vector<KV>& data) override;
bool apply(const std::vector<KV>& data);

/**
* @brief Persist commitLogId commitLogTerm and lastApplyLogId
Expand All @@ -92,6 +92,14 @@ class ESListener : public Listener {
*/
LogID lastApplyLogId() override;

void processLogs() override;

std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> commitSnapshot(
const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override;

private:
/**
* @brief Write last commit id, last commit term, last apply id to a file
Expand Down
Loading