Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify raft error code #3620

Merged
merged 3 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class StorageAccessExecutor : public Executor {
return Status::Error(std::move(error));
}
case nebula::cpp2::ErrorCode::E_LEADER_CHANGED:
return Status::Error("Storage Error: The leader has changed. Try again later");
return Status::Error(
folly::sformat("Storage Error: Not the leader of {}. Please retry later.", partId));
case nebula::cpp2::ErrorCode::E_INVALID_FILTER:
return Status::Error("Storage Error: Invalid filter.");
case nebula::cpp2::ErrorCode::E_INVALID_UPDATER:
Expand All @@ -88,6 +89,8 @@ class StorageAccessExecutor : public Executor {
return Status::Error("Storage Error: Invalid space vid len.");
case nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND:
return Status::Error("Storage Error: Space not found.");
case nebula::cpp2::ErrorCode::E_PART_NOT_FOUND:
return Status::Error(folly::sformat("Storage Error: Part {} not found.", partId));
case nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND:
return Status::Error("Storage Error: Tag not found.");
case nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND:
Expand All @@ -108,14 +111,25 @@ class StorageAccessExecutor : public Executor {
"The not null field doesn't have a default value.");
case nebula::cpp2::ErrorCode::E_OUT_OF_RANGE:
return Status::Error("Storage Error: Out of range value.");
case nebula::cpp2::ErrorCode::E_ATOMIC_OP_FAILED:
return Status::Error("Storage Error: Atomic operation failed.");
case nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR:
return Status::Error(
"Storage Error: More than one request trying to "
"add/update/delete one edge/vertex at the same time.");
case nebula::cpp2::ErrorCode::E_FILTER_OUT:
return Status::OK();
case nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE:
return Status::Error(folly::sformat(
"Storage Error: Term of part {} is out of date. Please retry later.", partId));
case nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL:
return Status::Error("Storage Error: Write wal failed. Probably disk is almost full.");
case nebula::cpp2::ErrorCode::E_RAFT_WRITE_BLOCKED:
return Status::Error(
"Storage Error: Write is blocked when creating snapshot. Please retry later.");
case nebula::cpp2::ErrorCode::E_RAFT_BUFFER_OVERFLOW:
return Status::Error(folly::sformat(
"Storage Error: Part {} raft buffer is full. Please retry later.", partId));
case nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED:
return Status::Error("Storage Error: Atomic operation failed.");
default:
auto status = Status::Error("Storage Error: part: %d, error: %s(%d).",
partId,
Expand Down
29 changes: 26 additions & 3 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,7 @@ enum ErrorCode {
E_FIELD_UNSET = -3007,
// Value exceeds the range of type
E_OUT_OF_RANGE = -3008,
// Atomic operation failed
E_ATOMIC_OP_FAILED = -3009,
E_DATA_CONFLICT_ERROR = -3010, // data conflict, for index write without toss.
E_DATA_CONFLICT_ERROR = -3010, // data conflict, for index write without toss.

E_WRITE_STALLED = -3011,

Expand Down Expand Up @@ -473,5 +471,30 @@ enum ErrorCode {
// get worker id
E_WORKER_ID_FAILED = -3062,

// 35xx for storaged raft
E_RAFT_UNKNOWN_PART = -3500,
// Raft consensus errors
E_RAFT_LOG_GAP = -3501,
E_RAFT_LOG_STALE = -3502,
E_RAFT_TERM_OUT_OF_DATE = -3503,
// Raft state errors
E_RAFT_WAITING_SNAPSHOT = -3511,
E_RAFT_SENDING_SNAPSHOT = -3512,
E_RAFT_INVALID_PEER = -3513,
E_RAFT_NOT_READY = -3514,
E_RAFT_STOPPED = -3515,
E_RAFT_BAD_ROLE = -3516,
// Local errors
E_RAFT_WAL_FAIL = -3521,
E_RAFT_HOST_STOPPED = -3522,
E_RAFT_TOO_MANY_REQUESTS = -3523,
E_RAFT_PERSIST_SNAPSHOT_FAILED = -3524,
E_RAFT_RPC_EXCEPTION = -3525,
E_RAFT_NO_WAL_FOUND = -3526,
E_RAFT_HOST_PAUSED = -3527,
E_RAFT_WRITE_BLOCKED = -3528,
E_RAFT_BUFFER_OVERFLOW = -3529,
E_RAFT_ATOMIC_OP_FAILED = -3530,

E_UNKNOWN = -8000,
} (cpp.enum_strict)
77 changes: 25 additions & 52 deletions src/interface/raftex.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,6 @@ enum Status {
WAITING_SNAPSHOT = 3; // Waiting for the snapshot.
} (cpp.enum_strict)

enum ErrorCode {
SUCCEEDED = 0;

E_UNKNOWN_PART = -1;

// Raft consensus errors
E_LOG_GAP = -2;
E_LOG_STALE = -3;
E_TERM_OUT_OF_DATE = -4;

// Raft state errors
E_WAITING_SNAPSHOT = -5; // The follower is waiting a snapshot
E_BAD_STATE = -6;
E_WRONG_LEADER = -7;
E_NOT_READY = -8;
E_BAD_ROLE = -9,

// Local errors
E_WAL_FAIL = -10;
E_HOST_STOPPED = -11;
E_TOO_MANY_REQUESTS = -12;
E_PERSIST_SNAPSHOT_FAILED = -13;
E_RPC_EXCEPTION = -14; // An thrift internal exception was thrown
E_NO_WAL_FOUND = -15;
E_APPLY_FAIL = -16;
E_HOST_PAUSED = -17;
}

typedef i64 (cpp.type = "nebula::ClusterID") ClusterID
typedef i32 (cpp.type = "nebula::GraphSpaceID") GraphSpaceID
Expand All @@ -73,8 +46,8 @@ struct AskForVoteRequest {

// Response message for the vote call
struct AskForVoteResponse {
1: ErrorCode error_code;
2: TermID current_term;
1: common.ErrorCode error_code;
2: TermID current_term;
}

// Log entries being sent to follower, logId is not included, it could be calculated by
Expand All @@ -98,13 +71,13 @@ struct AppendLogRequest {
}

struct AppendLogResponse {
1: ErrorCode error_code;
2: TermID current_term;
3: string leader_addr;
4: Port leader_port;
5: LogID committed_log_id;
6: LogID last_matched_log_id;
7: TermID last_matched_log_term;
1: common.ErrorCode error_code;
2: TermID current_term;
3: string leader_addr;
4: Port leader_port;
5: LogID committed_log_id;
6: LogID last_matched_log_id;
7: TermID last_matched_log_term;
}

struct SendSnapshotRequest {
Expand Down Expand Up @@ -133,17 +106,17 @@ struct HeartbeatRequest {
}

struct HeartbeatResponse {
1: ErrorCode error_code;
2: TermID current_term;
3: string leader_addr;
4: Port leader_port;
5: LogID committed_log_id;
6: LogID last_log_id;
7: TermID last_log_term;
1: common.ErrorCode error_code;
2: TermID current_term;
3: string leader_addr;
4: Port leader_port;
5: LogID committed_log_id;
6: LogID last_log_id;
7: TermID last_log_term;
}

struct SendSnapshotResponse {
1: ErrorCode error_code;
1: common.ErrorCode error_code;
}

struct GetStateRequest {
Expand All @@ -152,14 +125,14 @@ struct GetStateRequest {
}

struct GetStateResponse {
1: ErrorCode error_code;
2: Role role;
3: TermID term;
4: bool is_leader;
5: LogID committed_log_id;
6: LogID last_log_id;
7: TermID last_log_term;
8: Status status;
1: common.ErrorCode error_code;
2: Role role;
3: TermID term;
4: bool is_leader;
5: LogID committed_log_id;
6: LogID last_log_id;
7: TermID last_log_term;
8: Status status;
}

service RaftexService {
Expand Down
6 changes: 3 additions & 3 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,13 @@ class Listener : public raftex::RaftPart {
LOG(INFO) << idStr_ << "Find the new leader " << nLeader;
}

raftex::cpp2::ErrorCode checkPeer(const HostAddr& candidate) override {
nebula::cpp2::ErrorCode checkPeer(const HostAddr& candidate) override {
CHECK(!raftLock_.try_lock());
if (peers_.find(candidate) == peers_.end()) {
LOG(WARNING) << idStr_ << "The candidate " << candidate << " is not in my peers";
return raftex::cpp2::ErrorCode::E_WRONG_LEADER;
return nebula::cpp2::ErrorCode::E_RAFT_INVALID_PEER;
}
return raftex::cpp2::ErrorCode::SUCCEEDED;
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

// For listener, we just return true directly. Another background thread trigger the actual
Expand Down
85 changes: 28 additions & 57 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ DEFINE_int32(cluster_id, 0, "A unique id for each cluster");
namespace nebula {
namespace kvstore {

using nebula::raftex::AppendLogResult;

Part::Part(GraphSpaceID spaceId,
PartitionID partId,
HostAddr localAddr,
Expand Down Expand Up @@ -69,104 +67,96 @@ void Part::asyncPut(folly::StringPiece key, folly::StringPiece value, KVCallback
std::string log = encodeMultiValues(OP_PUT, key, value);

appendAsync(FLAGS_cluster_id, std::move(log))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncAppendBatch(std::string&& batch, KVCallback cb) {
appendAsync(FLAGS_cluster_id, std::move(batch))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncMultiPut(const std::vector<KV>& keyValues, KVCallback cb) {
std::string log = encodeMultiValues(OP_MULTI_PUT, keyValues);

appendAsync(FLAGS_cluster_id, std::move(log))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncRemove(folly::StringPiece key, KVCallback cb) {
std::string log = encodeSingleValue(OP_REMOVE, key);

appendAsync(FLAGS_cluster_id, std::move(log))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncMultiRemove(const std::vector<std::string>& keys, KVCallback cb) {
std::string log = encodeMultiValues(OP_MULTI_REMOVE, keys);

appendAsync(FLAGS_cluster_id, std::move(log))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncRemoveRange(folly::StringPiece start, folly::StringPiece end, KVCallback cb) {
std::string log = encodeMultiValues(OP_REMOVE_RANGE, start, end);

appendAsync(FLAGS_cluster_id, std::move(log))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::sync(KVCallback cb) {
sendCommandAsync("").thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
sendCommandAsync("").thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncAtomicOp(raftex::AtomicOp op, KVCallback cb) {
atomicOpAsync(std::move(op))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncAddLearner(const HostAddr& learner, KVCallback cb) {
std::string log = encodeHost(OP_ADD_LEARNER, learner);
sendCommandAsync(std::move(log))
.thenValue([callback = std::move(cb), learner, this](AppendLogResult res) mutable {
.thenValue([callback = std::move(cb), learner, this](nebula::cpp2::ErrorCode code) mutable {
LOG(INFO) << idStr_ << "add learner " << learner
<< ", result: " << static_cast<int32_t>(this->toResultCode(res));
callback(this->toResultCode(res));
<< ", result: " << apache::thrift::util::enumNameSafe(code);
callback(code);
});
}

void Part::asyncTransferLeader(const HostAddr& target, KVCallback cb) {
std::string log = encodeHost(OP_TRANS_LEADER, target);
sendCommandAsync(std::move(log))
.thenValue([callback = std::move(cb), target, this](AppendLogResult res) mutable {
.thenValue([callback = std::move(cb), target, this](nebula::cpp2::ErrorCode code) mutable {
LOG(INFO) << idStr_ << "transfer leader to " << target
<< ", result: " << static_cast<int32_t>(this->toResultCode(res));
callback(this->toResultCode(res));
<< ", result: " << apache::thrift::util::enumNameSafe(code);
callback(code);
});
}

void Part::asyncAddPeer(const HostAddr& peer, KVCallback cb) {
std::string log = encodeHost(OP_ADD_PEER, peer);
sendCommandAsync(std::move(log))
.thenValue([callback = std::move(cb), peer, this](AppendLogResult res) mutable {
.thenValue([callback = std::move(cb), peer, this](nebula::cpp2::ErrorCode code) mutable {
LOG(INFO) << idStr_ << "add peer " << peer
<< ", result: " << static_cast<int32_t>(this->toResultCode(res));
callback(this->toResultCode(res));
<< ", result: " << apache::thrift::util::enumNameSafe(code);
callback(code);
});
}

void Part::asyncRemovePeer(const HostAddr& peer, KVCallback cb) {
std::string log = encodeHost(OP_REMOVE_PEER, peer);
sendCommandAsync(std::move(log))
.thenValue([callback = std::move(cb), peer, this](AppendLogResult res) mutable {
.thenValue([callback = std::move(cb), peer, this](nebula::cpp2::ErrorCode code) mutable {
LOG(INFO) << idStr_ << "remove peer " << peer
<< ", result: " << static_cast<int32_t>(this->toResultCode(res));
callback(this->toResultCode(res));
<< ", result: " << apache::thrift::util::enumNameSafe(code);
callback(code);
});
}

Expand Down Expand Up @@ -519,24 +509,5 @@ nebula::cpp2::ErrorCode Part::cleanup() {
std::move(batch), FLAGS_rocksdb_disable_wal, FLAGS_rocksdb_wal_sync, true);
}

// TODO(pandasheep) unify raft errorcode
nebula::cpp2::ErrorCode Part::toResultCode(raftex::AppendLogResult res) {
switch (res) {
case raftex::AppendLogResult::SUCCEEDED:
return nebula::cpp2::ErrorCode::SUCCEEDED;
case raftex::AppendLogResult::E_NOT_A_LEADER:
return nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
case raftex::AppendLogResult::E_WRITE_BLOCKING:
return nebula::cpp2::ErrorCode::E_CHECKPOINT_BLOCKED;
case raftex::AppendLogResult::E_ATOMIC_OP_FAILURE:
return nebula::cpp2::ErrorCode::E_ATOMIC_OP_FAILED;
case raftex::AppendLogResult::E_BUFFER_OVERFLOW:
return nebula::cpp2::ErrorCode::E_CONSENSUS_ERROR;
default:
LOG(ERROR) << idStr_ << "Consensus error " << static_cast<int32_t>(res);
return nebula::cpp2::ErrorCode::E_CONSENSUS_ERROR;
}
}

} // namespace kvstore
} // namespace nebula
2 changes: 0 additions & 2 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ class Part : public raftex::RaftPart {

nebula::cpp2::ErrorCode cleanup() override;

nebula::cpp2::ErrorCode toResultCode(raftex::AppendLogResult res);

public:
struct CallbackOptions {
GraphSpaceID spaceId;
Expand Down
Loading