-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathAdminTaskProcessor.cpp
82 lines (73 loc) · 2.9 KB
/
AdminTaskProcessor.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "storage/admin/AdminTaskProcessor.h"
#include <thrift/lib/cpp/util/EnumUtils.h>
#include "interface/gen-cpp2/common_types.h"
#include "storage/admin/AdminTaskManager.h"
namespace nebula {
namespace storage {
void AdminTaskProcessor::process(const cpp2::AddAdminTaskRequest& req) {
auto taskManager = AdminTaskManager::instance();
auto cb = [env = env_, jobId = req.get_job_id(), taskId = req.get_task_id()](
nebula::cpp2::ErrorCode errCode, nebula::meta::cpp2::StatsItem& result) {
meta::cpp2::StatsItem* pStats = nullptr;
if (errCode == nebula::cpp2::ErrorCode::SUCCEEDED &&
*result.status_ref() == nebula::meta::cpp2::JobStatus::FINISHED) {
pStats = &result;
}
LOG(INFO) << folly::sformat("reportTaskFinish(), job={}, task={}, rc={}",
jobId,
taskId,
apache::thrift::util::enumNameSafe(errCode));
auto maxRetry = 5;
auto retry = 0;
while (retry++ < maxRetry) {
auto rc = nebula::cpp2::ErrorCode::SUCCEEDED;
auto fut = env->metaClient_->reportTaskFinish(jobId, taskId, errCode, pStats);
fut.wait();
if (!fut.hasValue()) {
LOG(INFO) << folly::sformat(
"reportTaskFinish() got rpc error:, job={}, task={}", jobId, taskId);
continue;
}
if (!fut.value().ok()) {
LOG(INFO) << folly::sformat("reportTaskFinish() has bad status:, job={}, task={}, rc={}",
jobId,
taskId,
fut.value().status().toString());
break;
}
rc = fut.value().value();
LOG(INFO) << folly::sformat("reportTaskFinish(), job={}, task={}, rc={}",
jobId,
taskId,
apache::thrift::util::enumNameSafe(rc));
if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED ||
rc == nebula::cpp2::ErrorCode::E_STORE_FAILURE) {
continue;
} else {
break;
}
}
};
TaskContext ctx(req, std::move(cb));
auto task = AdminTaskFactory::createAdminTask(env_, std::move(ctx));
if (task) {
taskManager->saveTaskStatus(
ctx.jobId_, ctx.taskId_, ctx.cmd_, nebula::meta::cpp2::JobStatus::RUNNING);
taskManager->addAsyncTask(task);
} else {
cpp2::PartitionResult thriftRet;
thriftRet.set_code(nebula::cpp2::ErrorCode::E_INVALID_TASK_PARA);
codes_.emplace_back(std::move(thriftRet));
}
onFinished();
}
void AdminTaskProcessor::onProcessFinished(nebula::meta::cpp2::StatsItem& result) {
resp_.set_stats(std::move(result));
}
} // namespace storage
} // namespace nebula