Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix br bugs in cluster mode #3604

Merged
merged 5 commits into from
Dec 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/nebula-metad.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@
--default_replica_factor=1

--heartbeat_interval_secs=10
--agent_heartbeat_interval_secs=60
1 change: 1 addition & 0 deletions conf/nebula-metad.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
--default_replica_factor=1

--heartbeat_interval_secs=10
--agent_heartbeat_interval_secs=60

############## rocksdb Options ##############
--rocksdb_wal_sync=true
1 change: 1 addition & 0 deletions resources/gflags.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
"rocksdb_block_based_table_options"
]
}

4 changes: 2 additions & 2 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -931,8 +931,8 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("list cluster failure!");
case nebula::cpp2::ErrorCode::E_LIST_CLUSTER_GET_ABS_PATH_FAILURE:
return Status::Error("Failed to get the absolute path!");
case nebula::cpp2::ErrorCode::E_GET_META_DIR_FAILURE:
return Status::Error("Failed to get meta dir!");
case nebula::cpp2::ErrorCode::E_LIST_CLUSTER_NO_AGENT_FAILURE:
return Status::Error("There is no agent!");
case nebula::cpp2::ErrorCode::E_INVALID_JOB:
return Status::Error("No valid job!");
case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE:
Expand Down
2 changes: 1 addition & 1 deletion src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
/* ListClusterInfo Failure */ \
X(E_LIST_CLUSTER_FAILURE, -2070) \
X(E_LIST_CLUSTER_GET_ABS_PATH_FAILURE, -2071) \
X(E_GET_META_DIR_FAILURE, -2072) \
X(E_LIST_CLUSTER_NO_AGENT_FAILURE, -2072) \
\
X(E_QUERY_NOT_FOUND, -2073) \
X(E_AGENT_HB_FAILUE, -2074) \
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/GflagsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ std::unordered_map<std::string, std::pair<cpp2::ConfigMode, bool>> GflagsManager
{"minloglevel", {cpp2::ConfigMode::MUTABLE, false}},
{"v", {cpp2::ConfigMode::MUTABLE, false}},
{"heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"agent_heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}},
{"slow_op_threshold_ms", {cpp2::ConfigMode::MUTABLE, false}},
{"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}},
Expand Down
2 changes: 1 addition & 1 deletion src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static const std::unordered_map<std::string, std::pair<std::string, bool>> syste
{"users", {"__users__", true}},
{"hosts", {"__hosts__", false}},
{"versions", {"__versions__", false}},
{"machines", {"__machines__", false}},
{"machines", {"__machines__", true}},
{"host_dirs", {"__host_dirs__", false}},
{"snapshots", {"__snapshots__", false}},
{"configs", {"__configs__", true}},
Expand Down
2 changes: 1 addition & 1 deletion src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class DropHosts final : public SingleDependencyNode {
};

class ShowHosts final : public SingleDependencyNode {
// TODO(shylock) meta/storage/graph enumerate
// TODO(shylock) meta/storage/graph/agent enumerate
public:
static ShowHosts* make(QueryContext* qctx, PlanNode* dep, meta::cpp2::ListHostType type) {
return qctx->objPool()->add(new ShowHosts(qctx, dep, type));
Expand Down
2 changes: 1 addition & 1 deletion src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ enum ErrorCode {
// ListClusterInfo Failure
E_LIST_CLUSTER_FAILURE = -2070,
E_LIST_CLUSTER_GET_ABS_PATH_FAILURE = -2071,
E_GET_META_DIR_FAILURE = -2072,
E_LIST_CLUSTER_NO_AGENT_FAILURE = -2072,

E_QUERY_NOT_FOUND = -2073,
E_AGENT_HB_FAILUE = -2074,
Expand Down
1 change: 1 addition & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ enum ListHostType {
GRAPH = 0x01,
META = 0x02,
STORAGE = 0x03,
AGENT = 0x04,
} (cpp.enum_strict)

struct ListHostsReq {
Expand Down
10 changes: 7 additions & 3 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "meta/processors/Common.h"

DECLARE_int32(heartbeat_interval_secs);
DEFINE_int32(agent_heartbeat_interval_secs, 60, "Agent heartbeat interval in seconds");
DECLARE_uint32(expired_time_factor);

namespace nebula {
Expand Down Expand Up @@ -144,9 +145,12 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> ActiveHostsMan::getActiv
}

std::vector<HostAddr> hosts;
int64_t threshold =
(expiredTTL == 0 ? FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor : expiredTTL) *
1000;
int64_t expiredTime =
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor; // meta/storage/graph
if (role == cpp2::HostRole::AGENT) {
expiredTime = FLAGS_agent_heartbeat_interval_secs * FLAGS_expired_time_factor;
}
int64_t threshold = (expiredTTL == 0 ? expiredTime : expiredTTL) * 1000;
auto now = time::WallClock::fastNowInMilliSec();
while (iter->valid()) {
auto host = MetaKeyUtils::parseHostKey(iter->key());
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ void BaseProcessor<RESP>::doPut(std::vector<kvstore::KV> data) {

template <typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, std::unique_ptr<kvstore::KVIterator>>
BaseProcessor<RESP>::doPrefix(const std::string& key) {
BaseProcessor<RESP>::doPrefix(const std::string& key, bool canReadFromFollower) {
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, key, &iter);
auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, key, &iter, canReadFromFollower);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(2) << "Prefix Failed";
return code;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class BaseProcessor {
void doPut(std::vector<kvstore::KV> data);

ErrorOr<nebula::cpp2::ErrorCode, std::unique_ptr<kvstore::KVIterator>> doPrefix(
const std::string& key);
const std::string& key, bool canReadFromFollower = false);

/**
* General get function.
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/admin/ListClusterInfoProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq& req) {
agentCount++;
}
}
if (agentCount != 1) {
if (agentCount < 1) {
LOG(ERROR) << folly::sformat("There are {} agent count is host {}", agentCount, host);
handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_FAILURE);
handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_NO_AGENT_FAILURE);
onFinished();
return;
}
Expand Down
10 changes: 7 additions & 3 deletions src/meta/processors/admin/RestoreProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr&
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED;
const auto& spacePrefix = MetaKeyUtils::spacePrefix();
auto iterRet = doPrefix(spacePrefix);
auto iterRet = doPrefix(spacePrefix, direct);
if (!nebula::ok(iterRet)) {
retCode = nebula::error(iterRet);
LOG(ERROR) << "Space prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode);
Expand All @@ -37,7 +37,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr&

for (const auto& spaceId : allSpaceId) {
const auto& partPrefix = MetaKeyUtils::partPrefix(spaceId);
auto iterPartRet = doPrefix(partPrefix);
auto iterPartRet = doPrefix(partPrefix, direct);
if (!nebula::ok(iterPartRet)) {
retCode = nebula::error(iterPartRet);
LOG(ERROR) << "Part prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode);
Expand Down Expand Up @@ -87,7 +87,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInZone(const HostAddr& ipv4
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED;
const auto& zonePrefix = MetaKeyUtils::zonePrefix();
auto iterRet = doPrefix(zonePrefix);
auto iterRet = doPrefix(zonePrefix, direct);
if (!nebula::ok(iterRet)) {
retCode = nebula::error(iterRet);
LOG(ERROR) << "Zone prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode);
Expand Down Expand Up @@ -153,6 +153,10 @@ void RestoreProcessor::process(const cpp2::RestoreMetaReq& req) {
auto replaceHosts = req.get_hosts();
if (!replaceHosts.empty()) {
for (auto h : replaceHosts) {
if (h.get_from_host() == h.get_to_host()) {
continue;
}

auto result = replaceHostInPartition(h.get_from_host(), h.get_to_host(), true);
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "replaceHost in partition fails when recovered";
Expand Down
11 changes: 9 additions & 2 deletions src/meta/processors/parts/ListHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "version/Version.h"

DECLARE_int32(heartbeat_interval_secs);
DECLARE_int32(agent_heartbeat_interval_secs);
DECLARE_uint32(expired_time_factor);
DEFINE_int32(removed_threshold_sec,
24 * 60 * 60,
Expand All @@ -26,6 +27,8 @@ static cpp2::HostRole toHostRole(cpp2::ListHostType type) {
return cpp2::HostRole::META;
case cpp2::ListHostType::STORAGE:
return cpp2::HostRole::STORAGE;
case cpp2::ListHostType::AGENT:
return cpp2::HostRole::AGENT;
default:
return cpp2::HostRole::UNKNOWN;
}
Expand Down Expand Up @@ -133,10 +136,14 @@ nebula::cpp2::ErrorCode ListHostsProcessor::allHostsWithStatus(cpp2::HostRole ro
}

if (now - info.lastHBTimeInMilliSec_ < FLAGS_removed_threshold_sec * 1000) {
int64_t expiredTime =
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000; // meta/storage/graph
if (info.role_ == cpp2::HostRole::AGENT) {
expiredTime = FLAGS_agent_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000;
}
// If meta didn't receive heartbeat with 2 periods, regard hosts as
// offline. Same as ActiveHostsMan::getActiveHosts
if (now - info.lastHBTimeInMilliSec_ <
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000) {
if (now - info.lastHBTimeInMilliSec_ < expiredTime) {
item.status_ref() = cpp2::HostStatus::ONLINE;
} else {
item.status_ref() = cpp2::HostStatus::OFFLINE;
Expand Down
4 changes: 3 additions & 1 deletion src/parser/parser.yy
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ static constexpr size_t kCommentLengthLimit = 256;
%token KW_DROP KW_REMOVE KW_SPACES KW_INGEST KW_INDEX KW_INDEXES
%token KW_IF KW_NOT KW_EXISTS KW_WITH
%token KW_BY KW_DOWNLOAD KW_HDFS KW_UUID KW_CONFIGS KW_FORCE
%token KW_GET KW_DECLARE KW_GRAPH KW_META KW_STORAGE
%token KW_GET KW_DECLARE KW_GRAPH KW_META KW_STORAGE KW_AGENT
%token KW_TTL KW_TTL_DURATION KW_TTL_COL KW_DATA KW_STOP
%token KW_FETCH KW_PROP KW_UPDATE KW_UPSERT KW_WHEN
%token KW_ORDER KW_ASC KW_LIMIT KW_SAMPLE KW_OFFSET KW_ASCENDING KW_DESCENDING
Expand Down Expand Up @@ -486,6 +486,7 @@ unreserved_keyword
| KW_GRAPH { $$ = new std::string("graph"); }
| KW_META { $$ = new std::string("meta"); }
| KW_STORAGE { $$ = new std::string("storage"); }
| KW_AGENT { $$ = new std::string("agent"); }
| KW_ALL { $$ = new std::string("all"); }
| KW_ANY { $$ = new std::string("any"); }
| KW_SINGLE { $$ = new std::string("single"); }
Expand Down Expand Up @@ -3444,6 +3445,7 @@ list_host_type
: KW_GRAPH { $$ = meta::cpp2::ListHostType::GRAPH; }
| KW_META { $$ = meta::cpp2::ListHostType::META; }
| KW_STORAGE { $$ = meta::cpp2::ListHostType::STORAGE; }
| KW_AGENT { $$ = meta::cpp2::ListHostType::AGENT; }
;

config_module_enum
Expand Down
1 change: 1 addition & 0 deletions src/parser/scanner.lex
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}*
"TTL_COL" { return TokenType::KW_TTL_COL; }
"GRAPH" { return TokenType::KW_GRAPH; }
"META" { return TokenType::KW_META; }
"AGENT" { return TokenType::KW_AGENT; }
"STORAGE" { return TokenType::KW_STORAGE; }
"SHORTEST" { return TokenType::KW_SHORTEST; }
"NOLOOP" { return TokenType::KW_NOLOOP; }
Expand Down
3 changes: 3 additions & 0 deletions src/parser/test/ScannerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,9 @@ TEST(Scanner, Basic) {
CHECK_SEMANTIC_TYPE("META", TokenType::KW_META),
CHECK_SEMANTIC_TYPE("Meta", TokenType::KW_META),
CHECK_SEMANTIC_TYPE("meta", TokenType::KW_META),
CHECK_SEMANTIC_TYPE("AGENT", TokenType::KW_AGENT),
CHECK_SEMANTIC_TYPE("Agent", TokenType::KW_AGENT),
CHECK_SEMANTIC_TYPE("agent", TokenType::KW_AGENT),
CHECK_SEMANTIC_TYPE("STORAGE", TokenType::KW_STORAGE),
CHECK_SEMANTIC_TYPE("Storage", TokenType::KW_STORAGE),
CHECK_SEMANTIC_TYPE("storage", TokenType::KW_STORAGE),
Expand Down