Skip to content

Commit

Permalink
refactor job manager part2 (#4001)
Browse files Browse the repository at this point in the history
* refactor job manager

fuck
adjust admintask
fix rebuld tag/edge index
refactor 'show tag index status' and 'show edge index status'
support JobManager::checkTypeJobRunning

* fix conflict

* address pengweisong's comments

* improve job manager UT

* address critical27's comment
  • Loading branch information
panda-sheep authored Mar 23, 2022
1 parent 6b0d402 commit dd270fb
Show file tree
Hide file tree
Showing 69 changed files with 1,371 additions and 1,470 deletions.
7 changes: 5 additions & 2 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ Status MetaClient::handleResponse(const RESP& resp) {
case nebula::cpp2::ErrorCode::E_INVALID_JOB:
return Status::Error("No valid job!");
case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE:
return Status::Error("Job not in chosen space!");
return Status::Error("Job not existed in chosen space!");
case nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE:
return Status::Error("Backup empty table!");
case nebula::cpp2::ErrorCode::E_BACKUP_TABLE_FAILED:
Expand Down Expand Up @@ -1149,8 +1149,9 @@ PartitionID MetaClient::partId(int32_t numParts, const VertexID id) const {
}

folly::Future<StatusOr<cpp2::AdminJobResult>> MetaClient::submitJob(
cpp2::JobOp op, cpp2::JobType type, std::vector<std::string> paras) {
GraphSpaceID spaceId, cpp2::JobOp op, cpp2::JobType type, std::vector<std::string> paras) {
cpp2::AdminJobReq req;
req.space_id_ref() = spaceId;
req.op_ref() = op;
req.type_ref() = type;
req.paras_ref() = std::move(paras);
Expand Down Expand Up @@ -3235,12 +3236,14 @@ folly::Future<StatusOr<cpp2::StatsItem>> MetaClient::getStats(GraphSpaceID space
}

folly::Future<StatusOr<nebula::cpp2::ErrorCode>> MetaClient::reportTaskFinish(
GraphSpaceID spaceId,
int32_t jobId,
int32_t taskId,
nebula::cpp2::ErrorCode taskErrCode,
cpp2::StatsItem* statisticItem) {
cpp2::ReportTaskReq req;
req.code_ref() = taskErrCode;
req.space_id_ref() = spaceId;
req.job_id_ref() = jobId;
req.task_id_ref() = taskId;
if (statisticItem) {
Expand Down
4 changes: 3 additions & 1 deletion src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ class MetaClient : public BaseMetaClient {
listener_ = nullptr;
}

folly::Future<StatusOr<cpp2::AdminJobResult>> submitJob(cpp2::JobOp op,
folly::Future<StatusOr<cpp2::AdminJobResult>> submitJob(GraphSpaceID spaceId,
cpp2::JobOp op,
cpp2::JobType type,
std::vector<std::string> paras);

Expand Down Expand Up @@ -627,6 +628,7 @@ class MetaClient : public BaseMetaClient {
folly::Future<StatusOr<cpp2::StatsItem>> getStats(GraphSpaceID spaceId);

folly::Future<StatusOr<nebula::cpp2::ErrorCode>> reportTaskFinish(
GraphSpaceID spaceId,
int32_t jobId,
int32_t taskId,
nebula::cpp2::ErrorCode taskErrCode,
Expand Down
149 changes: 148 additions & 1 deletion src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ static const std::unordered_map<
{"balance_plan", {"__balance_plan__", nullptr}},
{"ft_index", {"__ft_index__", nullptr}},
{"local_id", {"__local_id__", MetaKeyUtils::parseLocalIdSpace}},
{"disk_parts", {"__disk_parts__", MetaKeyUtils::parseDiskPartsSpace}}};
{"disk_parts", {"__disk_parts__", MetaKeyUtils::parseDiskPartsSpace}},
{"job_manager", {"__job_mgr__", nullptr}}};

// clang-format off
static const std::string kSpacesTable = tableMaps.at("spaces").first; // NOLINT
Expand All @@ -80,6 +81,18 @@ static const std::string kZonesTable = systemTableMaps.at("zones").firs
static const std::string kListenerTable = tableMaps.at("listener").first; // NOLINT
static const std::string kDiskPartsTable = tableMaps.at("disk_parts").first; // NOLINT

/*
* There will be one job, and a bunch of tasks use this prefix.
* If there are 1 job(let say 65536) which has 4 sub tasks, there will 5 records
* in kvstore which are
* __job_mgr_<65536>
* __job_mgr_<65536><0>
* __job_mgr_<65536><1>
* __job_mgr_<65536><2>
* __job_mgr_<65536><3>
* */
static const std::string kJobTable = tableMaps.at("job_manager").first; // NOLINT

// Used to record the number of vertices and edges in the space
// The number of vertices of each tag in the space
// The number of edges of each edgetype in the space
Expand Down Expand Up @@ -1354,4 +1367,138 @@ meta::cpp2::PartitionList MetaKeyUtils::parseDiskPartsVal(const folly::StringPie
return partList;
}

const std::string& MetaKeyUtils::jobPrefix() {
return kJobTable;
}

std::string MetaKeyUtils::jobPrefix(GraphSpaceID spaceId) {
std::string key;
key.reserve(kJobTable.size() + sizeof(GraphSpaceID));
key.append(kJobTable.data(), kJobTable.size())
.append(reinterpret_cast<const char*>(&spaceId), sizeof(GraphSpaceID));
return key;
}

std::string MetaKeyUtils::jobKey(GraphSpaceID spaceID, JobID jobId) {
std::string key;
key.reserve(kJobTable.size() + sizeof(GraphSpaceID) + sizeof(JobID));
key.append(kJobTable.data(), kJobTable.size())
.append(reinterpret_cast<const char*>(&spaceID), sizeof(GraphSpaceID))
.append(reinterpret_cast<const char*>(&jobId), sizeof(JobID));
return key;
}

bool MetaKeyUtils::isJobKey(const folly::StringPiece& rawKey) {
if (!rawKey.startsWith(kJobTable)) {
return false;
}
return rawKey.size() == kJobTable.size() + sizeof(GraphSpaceID) + sizeof(JobID);
}

std::string MetaKeyUtils::jobVal(const meta::cpp2::JobType& type,
std::vector<std::string> paras,
meta::cpp2::JobStatus jobStatus,
int64_t startTime,
int64_t stopTime) {
std::string val;
val.reserve(256);
val.append(reinterpret_cast<const char*>(&type), sizeof(meta::cpp2::JobType));
auto paraSize = paras.size();
val.append(reinterpret_cast<const char*>(&paraSize), sizeof(size_t));
for (auto& para : paras) {
auto len = para.length();
val.append(reinterpret_cast<const char*>(&len), sizeof(len))
.append(reinterpret_cast<const char*>(&para[0]), len);
}
val.append(reinterpret_cast<const char*>(&jobStatus), sizeof(meta::cpp2::JobStatus))
.append(reinterpret_cast<const char*>(&startTime), sizeof(int64_t))
.append(reinterpret_cast<const char*>(&stopTime), sizeof(int64_t));
return val;
}

std::tuple<meta::cpp2::JobType, std::vector<std::string>, meta::cpp2::JobStatus, int64_t, int64_t>
MetaKeyUtils::parseJobVal(folly::StringPiece rawVal) {
CHECK_GE(rawVal.size(),
sizeof(meta::cpp2::JobType) + sizeof(size_t) + sizeof(meta::cpp2::JobStatus) +
sizeof(int64_t) * 2);
auto type = *reinterpret_cast<const meta::cpp2::JobType*>(rawVal.data());
auto offset = sizeof(const meta::cpp2::JobType);

std::vector<std::string> paras;
auto vec_size = *reinterpret_cast<const size_t*>(rawVal.data() + offset);
offset += sizeof(size_t);
for (size_t i = 0; i < vec_size; ++i) {
auto len = *reinterpret_cast<const size_t*>(rawVal.data() + offset);
offset += sizeof(size_t);
paras.emplace_back(rawVal.data() + offset, len);
offset += len;
}

auto status = *reinterpret_cast<const meta::cpp2::JobStatus*>(rawVal.data() + offset);
offset += sizeof(meta::cpp2::JobStatus);
auto tStart = *reinterpret_cast<const int64_t*>(rawVal.data() + offset);
offset += sizeof(int64_t);
auto tStop = *reinterpret_cast<const int64_t*>(rawVal.data() + offset);
return std::make_tuple(type, paras, status, tStart, tStop);
}

std::pair<GraphSpaceID, JobID> MetaKeyUtils::parseJobKey(folly::StringPiece key) {
auto offset = kJobTable.size();
auto spaceId = *reinterpret_cast<const GraphSpaceID*>(key.data() + offset);
offset += sizeof(GraphSpaceID);
auto jobId = *reinterpret_cast<const JobID*>(key.data() + offset);
return std::make_pair(spaceId, jobId);
}

std::string MetaKeyUtils::taskKey(GraphSpaceID spaceId, JobID jobId, TaskID taskId) {
std::string key;
key.reserve(kJobTable.size() + sizeof(GraphSpaceID) + sizeof(JobID) + sizeof(TaskID));
key.append(kJobTable.data(), kJobTable.size())
.append(reinterpret_cast<const char*>(&spaceId), sizeof(GraphSpaceID))
.append(reinterpret_cast<const char*>(&jobId), sizeof(JobID))
.append(reinterpret_cast<const char*>(&taskId), sizeof(TaskID));
return key;
}

std::tuple<GraphSpaceID, JobID, TaskID> MetaKeyUtils::parseTaskKey(folly::StringPiece key) {
auto offset = kJobTable.size();
auto spaceId = *reinterpret_cast<const GraphSpaceID*>(key.data() + offset);
offset += sizeof(GraphSpaceID);
auto jobId = *reinterpret_cast<const JobID*>(key.data() + offset);
offset += sizeof(JobID);
auto taskId = *reinterpret_cast<const TaskID*>(key.data() + offset);
return std::make_tuple(spaceId, jobId, taskId);
}

std::string MetaKeyUtils::taskVal(HostAddr host,
meta::cpp2::JobStatus jobStatus,
int64_t startTime,
int64_t stopTime) {
std::string val;
val.reserve(128);
val.append(MetaKeyUtils::serializeHostAddr(host))
.append(reinterpret_cast<const char*>(&jobStatus), sizeof(meta::cpp2::JobStatus))
.append(reinterpret_cast<const char*>(&startTime), sizeof(int64_t))
.append(reinterpret_cast<const char*>(&stopTime), sizeof(int64_t));
return val;
}

std::tuple<HostAddr, meta::cpp2::JobStatus, int64_t, int64_t> MetaKeyUtils::parseTaskVal(
folly::StringPiece rawVal) {
CHECK_GE(rawVal.size(),
sizeof(size_t) + sizeof(Port) + sizeof(meta::cpp2::JobStatus) + sizeof(int64_t) * 2);
size_t offset = 0;
HostAddr host = MetaKeyUtils::deserializeHostAddr(rawVal);
offset += sizeof(size_t);
offset += host.host.size();
offset += sizeof(uint32_t);

auto status = *reinterpret_cast<const meta::cpp2::JobStatus*>(rawVal.data() + offset);
offset += sizeof(meta::cpp2::JobStatus);
auto tStart = *reinterpret_cast<const int64_t*>(rawVal.data() + offset);
offset += sizeof(int64_t);
auto tStop = *reinterpret_cast<const int64_t*>(rawVal.data() + offset);
return std::make_tuple(host, status, tStart, tStop);
}

} // namespace nebula
48 changes: 48 additions & 0 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,54 @@ class MetaKeyUtils final {
static std::string diskPartsVal(const meta::cpp2::PartitionList& partList);

static meta::cpp2::PartitionList parseDiskPartsVal(const folly::StringPiece& rawData);

// job related
static const std::string& jobPrefix();

static std::string jobPrefix(GraphSpaceID spaceId);

/**
* @brief Encoded job key, it should be
* kJobTable+spceId+jobid
*/
static std::string jobKey(GraphSpaceID spaceId, JobID iJob);

static bool isJobKey(const folly::StringPiece& rawKey);

static std::string jobVal(const meta::cpp2::JobType& type,
std::vector<std::string> paras,
meta::cpp2::JobStatus jobStatus,
int64_t startTime,
int64_t stopTime);
/**
* @brief Decode val from kvstore, return
* {jobType, paras, status, start time, stop time}
*/
static std::
tuple<meta::cpp2::JobType, std::vector<std::string>, meta::cpp2::JobStatus, int64_t, int64_t>
parseJobVal(folly::StringPiece rawVal);

static std::pair<GraphSpaceID, JobID> parseJobKey(folly::StringPiece key);

// task related
/**
* @brief Encoded task key, it should bekJobTable+spceId+jobid+taskid
*/
static std::string taskKey(GraphSpaceID spaceId, JobID jobId, TaskID taskId);

static std::tuple<GraphSpaceID, JobID, TaskID> parseTaskKey(folly::StringPiece key);

static std::string taskVal(HostAddr host,
meta::cpp2::JobStatus jobStatus,
int64_t startTime,
int64_t stopTime);

/**
* @brief Decode task val,it should be
* {host, status, start time, stop time}
*/
static std::tuple<HostAddr, meta::cpp2::JobStatus, int64_t, int64_t> parseTaskVal(
folly::StringPiece rawVal);
};

} // namespace nebula
Expand Down
32 changes: 27 additions & 5 deletions src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,37 @@ std::string NebulaKeyUtils::toEdgeKey(const folly::StringPiece& lockKey) {
return ret;
}

std::string NebulaKeyUtils::adminTaskKey(int32_t seqId, JobID jobId, TaskID taskId) {
std::string NebulaKeyUtils::adminTaskKey(int32_t seqId,
GraphSpaceID spaceId,
JobID jobId,
TaskID taskId) {
std::string key;
key.reserve(sizeof(int32_t) + sizeof(JobID) + sizeof(TaskID));
key.append(reinterpret_cast<char*>(&seqId), sizeof(int32_t));
key.append(reinterpret_cast<char*>(&jobId), sizeof(JobID));
key.append(reinterpret_cast<char*>(&taskId), sizeof(TaskID));
key.reserve(sizeof(int32_t) + sizeof(GraphSpaceID) + sizeof(JobID) + sizeof(TaskID));
key.append(reinterpret_cast<char*>(&seqId), sizeof(int32_t))
.append(reinterpret_cast<char*>(&spaceId), sizeof(GraphSpaceID))
.append(reinterpret_cast<char*>(&jobId), sizeof(JobID))
.append(reinterpret_cast<char*>(&taskId), sizeof(TaskID));
return key;
}

bool NebulaKeyUtils::isAdminTaskKey(const folly::StringPiece& rawKey) {
return rawKey.size() == sizeof(int32_t) + sizeof(GraphSpaceID) + sizeof(JobID) + sizeof(TaskID);
}

std::tuple<int32_t, GraphSpaceID, JobID, TaskID> NebulaKeyUtils::parseAdminTaskKey(
folly::StringPiece key) {
CHECK_EQ(key.size(), sizeof(int32_t) + sizeof(GraphSpaceID) + sizeof(JobID) + sizeof(TaskID));
size_t offset = 0;
int32_t seqId = *reinterpret_cast<const int32_t*>(key.data());
offset += sizeof(int32_t);
GraphSpaceID spaceId = *reinterpret_cast<const GraphSpaceID*>(key.data() + offset);
offset += sizeof(GraphSpaceID);
JobID jobId = *reinterpret_cast<const JobID*>(key.data() + offset);
offset += sizeof(JobID);
TaskID taskId = *reinterpret_cast<const TaskID*>(key.data() + offset);
return std::make_tuple(seqId, spaceId, jobId, taskId);
}

std::string NebulaKeyUtils::dataVersionKey() {
return "\xFF\xFF\xFF\xFF";
}
Expand Down
6 changes: 5 additions & 1 deletion src/common/utils/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,11 @@ class NebulaKeyUtils final {
LOG(FATAL) << msg.str();
}

static std::string adminTaskKey(int32_t seqId, JobID jobId, TaskID taskId);
static std::string adminTaskKey(int32_t seqId, GraphSpaceID spaceId, JobID jobId, TaskID taskId);

static bool isAdminTaskKey(const folly::StringPiece& rawKey);

static std::tuple<int32_t, GraphSpaceID, JobID, TaskID> parseAdminTaskKey(folly::StringPiece key);

static std::string dataVersionKey();

Expand Down
9 changes: 5 additions & 4 deletions src/graph/executor/admin/SubmitJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ folly::Future<Status> SubmitJobExecutor::execute() {
SCOPED_TIMER(&execTime_);

auto *sjNode = asNode<SubmitJob>(node());
auto spaceId = qctx()->rctx()->session()->space().id;
auto jobOp = sjNode->jobOp();
auto jobType = sjNode->jobType();
auto params = sjNode->params();

return qctx()
->getMetaClient()
->submitJob(jobOp, jobType, params)
->submitJob(spaceId, jobOp, jobType, params)
.via(runner())
.thenValue([jobOp, this](StatusOr<meta::cpp2::AdminJobResult> &&resp) {
SCOPED_TIMER(&execTime_);
Expand Down Expand Up @@ -82,7 +83,7 @@ StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::JobOp jobOp,
const auto &jobsDesc = *resp.job_desc_ref();
for (const auto &jobDesc : jobsDesc) {
v.emplace_back(nebula::Row({
jobDesc.get_id(),
jobDesc.get_job_id(),
apache::thrift::util::enumNameSafe(jobDesc.get_type()),
apache::thrift::util::enumNameSafe(jobDesc.get_status()),
convertJobTimestampToDateTime(jobDesc.get_start_time()),
Expand Down Expand Up @@ -117,7 +118,7 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
size_t index = std::stoul(paras.back());
uint32_t total = paras.size() - index - 1, succeeded = 0, failed = 0, inProgress = 0,
invalid = 0;
v.emplace_back(Row({jd.get_id(),
v.emplace_back(Row({jd.get_job_id(),
apache::thrift::util::enumNameSafe(jd.get_type()),
apache::thrift::util::enumNameSafe(jd.get_status()),
convertJobTimestampToDateTime(jd.get_start_time()).toString(),
Expand Down Expand Up @@ -154,7 +155,7 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
} else {
nebula::DataSet v({"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time"});
v.emplace_back(nebula::Row({
jd.get_id(),
jd.get_job_id(),
apache::thrift::util::enumNameSafe(jd.get_type()),
apache::thrift::util::enumNameSafe(jd.get_status()),
convertJobTimestampToDateTime(jd.get_start_time()),
Expand Down
4 changes: 2 additions & 2 deletions src/graph/executor/test/JobTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ TEST_F(JobTest, JobFinishTime) {
meta::cpp2::AdminJobResult resp;
resp.job_id_ref() = 0;
meta::cpp2::JobDesc jobDesc;
jobDesc.id_ref() = 0;
jobDesc.job_id_ref() = 0;
jobDesc.start_time_ref() = 123;
jobDesc.stop_time_ref() = 0;
resp.job_desc_ref() = {std::move(jobDesc)};
Expand All @@ -46,7 +46,7 @@ TEST_F(JobTest, JobFinishTime) {
meta::cpp2::AdminJobResult resp;
resp.job_id_ref() = 0;
meta::cpp2::JobDesc jobDesc;
jobDesc.id_ref() = 0;
jobDesc.job_id_ref() = 0;
jobDesc.start_time_ref() = 123;
jobDesc.stop_time_ref() = 0;
resp.job_desc_ref() = {std::move(jobDesc)};
Expand Down
Loading

0 comments on commit dd270fb

Please sign in to comment.