Skip to content

Commit

Permalink
rename AdminCmd to JobType
Browse files Browse the repository at this point in the history
  • Loading branch information
panda-sheep committed Mar 17, 2022
1 parent a57acd0 commit c8f10c3
Show file tree
Hide file tree
Showing 39 changed files with 284 additions and 293 deletions.
4 changes: 2 additions & 2 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1149,10 +1149,10 @@ PartitionID MetaClient::partId(int32_t numParts, const VertexID id) const {
}

folly::Future<StatusOr<cpp2::AdminJobResult>> MetaClient::submitJob(
cpp2::AdminJobOp op, cpp2::AdminCmd cmd, std::vector<std::string> paras) {
cpp2::JobOp op, cpp2::JobType type, std::vector<std::string> paras) {
cpp2::AdminJobReq req;
req.op_ref() = op;
req.cmd_ref() = cmd;
req.type_ref() = type;
req.paras_ref() = std::move(paras);
folly::Promise<StatusOr<cpp2::AdminJobResult>> promise;
auto future = promise.getFuture();
Expand Down
4 changes: 2 additions & 2 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ class MetaClient {
listener_ = nullptr;
}

folly::Future<StatusOr<cpp2::AdminJobResult>> submitJob(cpp2::AdminJobOp op,
cpp2::AdminCmd cmd,
folly::Future<StatusOr<cpp2::AdminJobResult>> submitJob(cpp2::JobOp op,
cpp2::JobType type,
std::vector<std::string> paras);

// Operations for parts
Expand Down
26 changes: 13 additions & 13 deletions src/graph/executor/admin/SubmitJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ folly::Future<Status> SubmitJobExecutor::execute() {

auto *sjNode = asNode<SubmitJob>(node());
auto jobOp = sjNode->jobOp();
auto cmd = sjNode->cmd();
auto jobType = sjNode->jobType();
auto params = sjNode->params();

return qctx()
->getMetaClient()
->submitJob(jobOp, cmd, params)
->submitJob(jobOp, jobType, params)
.via(runner())
.thenValue([jobOp, this](StatusOr<meta::cpp2::AdminJobResult> &&resp) {
SCOPED_TIMER(&execTime_);
Expand All @@ -40,10 +40,10 @@ folly::Future<Status> SubmitJobExecutor::execute() {
});
}

StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp,
StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::JobOp jobOp,
meta::cpp2::AdminJobResult &&resp) {
switch (jobOp) {
case meta::cpp2::AdminJobOp::ADD: {
case meta::cpp2::JobOp::ADD: {
nebula::DataSet v({"New Job Id"});
DCHECK(resp.job_id_ref().has_value());
if (!resp.job_id_ref().has_value()) {
Expand All @@ -52,7 +52,7 @@ StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp,
v.emplace_back(nebula::Row({*resp.job_id_ref()}));
return v;
}
case meta::cpp2::AdminJobOp::RECOVER: {
case meta::cpp2::JobOp::RECOVER: {
nebula::DataSet v({"Recovered job num"});
DCHECK(resp.recovered_job_num_ref().has_value());
if (!resp.recovered_job_num_ref().has_value()) {
Expand All @@ -61,7 +61,7 @@ StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp,
v.emplace_back(nebula::Row({*resp.recovered_job_num_ref()}));
return v;
}
case meta::cpp2::AdminJobOp::SHOW: {
case meta::cpp2::JobOp::SHOW: {
DCHECK(resp.job_desc_ref().has_value());
if (!resp.job_desc_ref().has_value()) {
return Status::Error("Response unexpected.");
Expand All @@ -73,7 +73,7 @@ StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp,
auto &jobDesc = *resp.job_desc_ref();
return buildShowResultData(jobDesc.front(), *resp.get_task_desc());
}
case meta::cpp2::AdminJobOp::SHOW_All: {
case meta::cpp2::JobOp::SHOW_All: {
nebula::DataSet v({"Job Id", "Command", "Status", "Start Time", "Stop Time"});
DCHECK(resp.job_desc_ref().has_value());
if (!resp.job_desc_ref().has_value()) {
Expand All @@ -83,15 +83,15 @@ StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp,
for (const auto &jobDesc : jobsDesc) {
v.emplace_back(nebula::Row({
jobDesc.get_id(),
apache::thrift::util::enumNameSafe(jobDesc.get_cmd()),
apache::thrift::util::enumNameSafe(jobDesc.get_type()),
apache::thrift::util::enumNameSafe(jobDesc.get_status()),
convertJobTimestampToDateTime(jobDesc.get_start_time()),
convertJobTimestampToDateTime(jobDesc.get_stop_time()),
}));
}
return v;
}
case meta::cpp2::AdminJobOp::STOP: {
case meta::cpp2::JobOp::STOP: {
nebula::DataSet v({"Result"});
v.emplace_back(nebula::Row({"Job stopped"}));
return v;
Expand All @@ -109,16 +109,16 @@ Value SubmitJobExecutor::convertJobTimestampToDateTime(int64_t timestamp) {

nebula::DataSet SubmitJobExecutor::buildShowResultData(
const nebula::meta::cpp2::JobDesc &jd, const std::vector<nebula::meta::cpp2::TaskDesc> &td) {
if (jd.get_cmd() == meta::cpp2::AdminCmd::DATA_BALANCE ||
jd.get_cmd() == meta::cpp2::AdminCmd::ZONE_BALANCE) {
if (jd.get_type() == meta::cpp2::JobType::DATA_BALANCE ||
jd.get_type() == meta::cpp2::JobType::ZONE_BALANCE) {
nebula::DataSet v(
{"Job Id(spaceId:partId)", "Command(src->dst)", "Status", "Start Time", "Stop Time"});
const auto &paras = jd.get_paras();
size_t index = std::stoul(paras.back());
uint32_t total = paras.size() - index - 1, succeeded = 0, failed = 0, inProgress = 0,
invalid = 0;
v.emplace_back(Row({jd.get_id(),
apache::thrift::util::enumNameSafe(jd.get_cmd()),
apache::thrift::util::enumNameSafe(jd.get_type()),
apache::thrift::util::enumNameSafe(jd.get_status()),
convertJobTimestampToDateTime(jd.get_start_time()).toString(),
convertJobTimestampToDateTime(jd.get_stop_time()).toString()}));
Expand Down Expand Up @@ -155,7 +155,7 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
nebula::DataSet v({"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time"});
v.emplace_back(nebula::Row({
jd.get_id(),
apache::thrift::util::enumNameSafe(jd.get_cmd()),
apache::thrift::util::enumNameSafe(jd.get_type()),
apache::thrift::util::enumNameSafe(jd.get_status()),
convertJobTimestampToDateTime(jd.get_start_time()),
convertJobTimestampToDateTime(jd.get_stop_time()),
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/admin/SubmitJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class SubmitJobExecutor final : public Executor {

private:
FRIEND_TEST(JobTest, JobFinishTime);
StatusOr<DataSet> buildResult(meta::cpp2::AdminJobOp jobOp, meta::cpp2::AdminJobResult &&resp);
StatusOr<DataSet> buildResult(meta::cpp2::JobOp jobOp, meta::cpp2::AdminJobResult &&resp);
Value convertJobTimestampToDateTime(int64_t timestamp);
nebula::DataSet buildShowResultData(const nebula::meta::cpp2::JobDesc &jd,
const std::vector<nebula::meta::cpp2::TaskDesc> &td);
Expand Down
8 changes: 4 additions & 4 deletions src/graph/executor/test/JobTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ TEST_F(JobTest, JobFinishTime) {

auto qctx = std::make_unique<QueryContext>();
auto submitJob = SubmitJob::make(
qctx.get(), nullptr, meta::cpp2::AdminJobOp::SHOW, meta::cpp2::AdminCmd::UNKNOWN, {});
qctx.get(), nullptr, meta::cpp2::JobOp::SHOW, meta::cpp2::JobType::UNKNOWN, {});
auto submitJobExe = std::make_unique<SubmitJobExecutor>(submitJob, qctx.get());

auto status = submitJobExe->buildResult(meta::cpp2::AdminJobOp::SHOW, std::move(resp));
auto status = submitJobExe->buildResult(meta::cpp2::JobOp::SHOW, std::move(resp));
EXPECT_TRUE(status.ok());
auto result = std::move(status).value();
EXPECT_EQ(result.rows.size(), 2);
Expand All @@ -53,10 +53,10 @@ TEST_F(JobTest, JobFinishTime) {

auto qctx = std::make_unique<QueryContext>();
auto submitJob = SubmitJob::make(
qctx.get(), nullptr, meta::cpp2::AdminJobOp::SHOW_All, meta::cpp2::AdminCmd::UNKNOWN, {});
qctx.get(), nullptr, meta::cpp2::JobOp::SHOW_All, meta::cpp2::JobType::UNKNOWN, {});
auto submitJobExe = std::make_unique<SubmitJobExecutor>(submitJob, qctx.get());

auto status = submitJobExe->buildResult(meta::cpp2::AdminJobOp::SHOW_All, std::move(resp));
auto status = submitJobExe->buildResult(meta::cpp2::JobOp::SHOW_All, std::move(resp));
EXPECT_TRUE(status.ok());
auto result = std::move(status).value();
EXPECT_EQ(result.rows.size(), 1);
Expand Down
2 changes: 1 addition & 1 deletion src/graph/planner/plan/Admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ std::unique_ptr<PlanNodeDescription> ListRoles::explain() const {
std::unique_ptr<PlanNodeDescription> SubmitJob::explain() const {
auto desc = SingleDependencyNode::explain();
addDescription("operation", apache::thrift::util::enumNameSafe(op_), desc.get());
addDescription("command", apache::thrift::util::enumNameSafe(cmd_), desc.get());
addDescription("command", apache::thrift::util::enumNameSafe(type_), desc.get());
addDescription("parameters", folly::toJson(util::toJson(params_)), desc.get());
return desc;
}
Expand Down
20 changes: 10 additions & 10 deletions src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -908,21 +908,21 @@ class SubmitJob final : public SingleDependencyNode {
public:
static SubmitJob* make(QueryContext* qctx,
PlanNode* dep,
meta::cpp2::AdminJobOp op,
meta::cpp2::AdminCmd cmd,
meta::cpp2::JobOp op,
meta::cpp2::JobType cmd,
const std::vector<std::string>& params) {
return qctx->objPool()->add(new SubmitJob(qctx, dep, op, cmd, params));
}

std::unique_ptr<PlanNodeDescription> explain() const override;

public:
meta::cpp2::AdminJobOp jobOp() const {
meta::cpp2::JobOp jobOp() const {
return op_;
}

meta::cpp2::AdminCmd cmd() const {
return cmd_;
meta::cpp2::JobType jobType() const {
return type_;
}

const std::vector<std::string>& params() const {
Expand All @@ -932,14 +932,14 @@ class SubmitJob final : public SingleDependencyNode {
private:
SubmitJob(QueryContext* qctx,
PlanNode* dep,
meta::cpp2::AdminJobOp op,
meta::cpp2::AdminCmd cmd,
meta::cpp2::JobOp op,
meta::cpp2::JobType type,
const std::vector<std::string>& params)
: SingleDependencyNode(qctx, Kind::kSubmitJob, dep), op_(op), cmd_(cmd), params_(params) {}
: SingleDependencyNode(qctx, Kind::kSubmitJob, dep), op_(op), type_(type), params_(params) {}

private:
meta::cpp2::AdminJobOp op_;
meta::cpp2::AdminCmd cmd_;
meta::cpp2::JobOp op_;
meta::cpp2::JobType type_;
const std::vector<std::string> params_;
};

Expand Down
16 changes: 8 additions & 8 deletions src/graph/validator/AdminJobValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ namespace nebula {
namespace graph {

Status AdminJobValidator::validateImpl() {
if (sentence_->getCmd() == meta::cpp2::AdminCmd::DATA_BALANCE ||
sentence_->getCmd() == meta::cpp2::AdminCmd::ZONE_BALANCE) {
if (sentence_->getJobType() == meta::cpp2::JobType::DATA_BALANCE ||
sentence_->getJobType() == meta::cpp2::JobType::ZONE_BALANCE) {
return Status::SemanticError("Data balance not support");
}
if (sentence_->getOp() == meta::cpp2::AdminJobOp::ADD) {
auto cmd = sentence_->getCmd();
if (sentence_->getOp() == meta::cpp2::JobOp::ADD) {
auto jobType = sentence_->getJobType();
if (requireSpace()) {
const auto &spaceInfo = vctx_->whichSpace();
auto spaceId = spaceInfo.id;
const auto &spaceName = spaceInfo.name;
sentence_->addPara(spaceName);

if (cmd == meta::cpp2::AdminCmd::REBUILD_TAG_INDEX ||
cmd == meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX) {
auto ret = cmd == meta::cpp2::AdminCmd::REBUILD_TAG_INDEX
if (jobType == meta::cpp2::JobType::REBUILD_TAG_INDEX ||
jobType == meta::cpp2::JobType::REBUILD_EDGE_INDEX) {
auto ret = jobType == meta::cpp2::JobType::REBUILD_TAG_INDEX
? qctx()->indexMng()->getTagIndexes(spaceId)
: qctx()->indexMng()->getEdgeIndexes(spaceId);
if (!ret.ok()) {
Expand Down Expand Up @@ -60,7 +60,7 @@ Status AdminJobValidator::validateImpl() {

Status AdminJobValidator::toPlan() {
auto *doNode = SubmitJob::make(
qctx_, nullptr, sentence_->getOp(), sentence_->getCmd(), sentence_->getParas());
qctx_, nullptr, sentence_->getOp(), sentence_->getJobType(), sentence_->getParas());
root_ = doNode;
tail_ = root_;
return Status::OK();
Expand Down
36 changes: 18 additions & 18 deletions src/graph/validator/AdminJobValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,31 @@ class AdminJobValidator final : public Validator {

bool requireSpace() const {
switch (sentence_->getOp()) {
case meta::cpp2::AdminJobOp::ADD:
switch (sentence_->getCmd()) {
case meta::cpp2::JobOp::ADD:
switch (sentence_->getJobType()) {
// All jobs are space-level, except for the jobs that need to be refactored.
case meta::cpp2::AdminCmd::REBUILD_TAG_INDEX:
case meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX:
case meta::cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX:
case meta::cpp2::AdminCmd::STATS:
case meta::cpp2::AdminCmd::COMPACT:
case meta::cpp2::AdminCmd::FLUSH:
case meta::cpp2::AdminCmd::DATA_BALANCE:
case meta::cpp2::AdminCmd::LEADER_BALANCE:
case meta::cpp2::AdminCmd::ZONE_BALANCE:
case meta::cpp2::JobType::REBUILD_TAG_INDEX:
case meta::cpp2::JobType::REBUILD_EDGE_INDEX:
case meta::cpp2::JobType::REBUILD_FULLTEXT_INDEX:
case meta::cpp2::JobType::STATS:
case meta::cpp2::JobType::COMPACT:
case meta::cpp2::JobType::FLUSH:
case meta::cpp2::JobType::DATA_BALANCE:
case meta::cpp2::JobType::LEADER_BALANCE:
case meta::cpp2::JobType::ZONE_BALANCE:
return true;
// TODO: download and ingest need to be refactored to use the rpc protocol.
// Currently they are using their own validator
case meta::cpp2::AdminCmd::DOWNLOAD:
case meta::cpp2::AdminCmd::INGEST:
case meta::cpp2::AdminCmd::UNKNOWN:
case meta::cpp2::JobType::DOWNLOAD:
case meta::cpp2::JobType::INGEST:
case meta::cpp2::JobType::UNKNOWN:
return false;
}
break;
case meta::cpp2::AdminJobOp::SHOW_All:
case meta::cpp2::AdminJobOp::SHOW:
case meta::cpp2::AdminJobOp::STOP:
case meta::cpp2::AdminJobOp::RECOVER:
case meta::cpp2::JobOp::SHOW_All:
case meta::cpp2::JobOp::SHOW:
case meta::cpp2::JobOp::STOP:
case meta::cpp2::JobOp::RECOVER:
return true;
}
return false;
Expand Down
18 changes: 9 additions & 9 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,15 @@ struct AlterSpaceReq {
}

// Job related data structures
enum AdminJobOp {
enum JobOp {
ADD = 0x01,
SHOW_All = 0x02,
SHOW = 0x03,
STOP = 0x04,
RECOVER = 0x05,
} (cpp.enum_strict)

struct AdminJobReq {
1: AdminJobOp op,
2: AdminCmd cmd,
3: list<binary> paras,
}

enum AdminCmd {
enum JobType {
COMPACT = 0,
FLUSH = 1,
REBUILD_TAG_INDEX = 2,
Expand All @@ -249,6 +243,12 @@ enum AdminCmd {
UNKNOWN = 99,
} (cpp.enum_strict)

struct AdminJobReq {
1: JobOp op,
2: JobType type,
3: list<binary> paras,
}

enum JobStatus {
QUEUE = 0x01,
RUNNING = 0x02,
Expand All @@ -260,7 +260,7 @@ enum JobStatus {

struct JobDesc {
1: i32 id,
2: AdminCmd cmd,
2: JobType type,
3: list<string> paras,
4: JobStatus status,
5: i64 start_time,
Expand Down
2 changes: 1 addition & 1 deletion src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ struct ListClusterInfoReq {

struct AddTaskRequest {
// Task distributed to storage to execute, e.g. flush, compact, stats, etc.
1: meta.AdminCmd cmd
1: meta.JobType job_type
2: i32 job_id
3: i32 task_id
4: TaskPara para
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ folly::Future<StatusOr<bool>> AdminClient::blockingWrites(const std::set<GraphSp
}

folly::Future<StatusOr<bool>> AdminClient::addTask(
cpp2::AdminCmd cmd,
cpp2::JobType type,
int32_t jobId,
int32_t taskId,
GraphSpaceID spaceId,
Expand All @@ -767,7 +767,7 @@ folly::Future<StatusOr<bool>> AdminClient::addTask(
auto adminAddr = Utils::getAdminAddrFromStoreAddr(host);

storage::cpp2::AddTaskRequest req;
req.cmd_ref() = cmd;
req.job_type_ref() = type;
req.job_id_ref() = jobId;
req.task_id_ref() = taskId;

Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/AdminClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class AdminClient {
* @param parts
* @return folly::Future<StatusOr<bool>> Return true if succeed, else return an error status
*/
virtual folly::Future<StatusOr<bool>> addTask(cpp2::AdminCmd cmd,
virtual folly::Future<StatusOr<bool>> addTask(cpp2::JobType jobType,
int32_t jobId,
int32_t taskId,
GraphSpaceID spaceId,
Expand Down
Loading

0 comments on commit c8f10c3

Please sign in to comment.