Skip to content

Commit

Permalink
store admintasks' status and send the status to metad when starting
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenhui-soul committed Sep 13, 2021
1 parent af5d38d commit f62eb9d
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 9 deletions.
2 changes: 2 additions & 0 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class KVEngine {
const std::string& prefix,
std::unique_ptr<KVIterator>* iter) = 0;

virtual nebula::cpp2::ErrorCode scan(std::unique_ptr<KVIterator>* storageIter) = 0;

// Write a single record
virtual nebula::cpp2::ErrorCode put(std::string key, std::string value) = 0;

Expand Down
9 changes: 9 additions & 0 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,15 @@ nebula::cpp2::ErrorCode RocksEngine::rangeWithPrefix(const std::string& start,
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode RocksEngine::scan(std::unique_ptr<KVIterator>* storageIter) {
rocksdb::ReadOptions options;
options.total_order_seek = true;
rocksdb::Iterator* iter = db_->NewIterator(options);
iter->SeekToFirst();
storageIter->reset(new RocksCommonIter(iter));
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode RocksEngine::put(std::string key, std::string value) {
rocksdb::WriteOptions options;
options.disableWAL = FLAGS_rocksdb_disable_wal;
Expand Down
25 changes: 25 additions & 0 deletions src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,30 @@ class RocksPrefixIter : public KVIterator {
rocksdb::Slice prefix_;
};

class RocksCommonIter : public KVIterator {
public:
explicit RocksCommonIter(rocksdb::Iterator* iter) : iter_(iter) {}

~RocksCommonIter() = default;

bool valid() const override { return !!iter_ && iter_->Valid(); }

void next() override { iter_->Next(); }

void prev() override { iter_->Prev(); }

folly::StringPiece key() const override {
return folly::StringPiece(iter_->key().data(), iter_->key().size());
}

folly::StringPiece val() const override {
return folly::StringPiece(iter_->value().data(), iter_->value().size());
}

protected:
std::unique_ptr<rocksdb::Iterator> iter_;
};

/**************************************************************************
*
* An implementation of KVEngine based on Rocksdb
Expand Down Expand Up @@ -128,6 +152,7 @@ class RocksEngine : public KVEngine {
const std::string& prefix,
std::unique_ptr<KVIterator>* iter) override;

nebula::cpp2::ErrorCode scan(std::unique_ptr<KVIterator>* storageIter) override;
/*********************
* Data modification
********************/
Expand Down
2 changes: 2 additions & 0 deletions src/storage/CommonUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "common/stats/StatsManager.h"
#include "common/utils/MemoryLockWrapper.h"
#include "interface/gen-cpp2/storage_types.h"
#include "kvstore/KVEngine.h"
#include "kvstore/KVStore.h"

namespace nebula {
Expand Down Expand Up @@ -77,6 +78,7 @@ class StorageEnv {
TransactionManager* txnMan_{nullptr};
std::unique_ptr<VerticesMemLock> verticesML_{nullptr};
std::unique_ptr<EdgesMemLock> edgesML_{nullptr};
std::unique_ptr<kvstore::KVEngine> adminStore_{nullptr};

IndexState getIndexState(GraphSpaceID space, PartitionID part) {
auto key = std::make_tuple(space, part);
Expand Down
21 changes: 15 additions & 6 deletions src/storage/StorageServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "common/thread/GenericThreadPool.h"
#include "common/utils/Utils.h"
#include "kvstore/PartManager.h"
#include "kvstore/RocksEngine.h"
#include "storage/BaseProcessor.h"
#include "storage/CompactionFilter.h"
#include "storage/GraphStorageServiceHandler.h"
Expand Down Expand Up @@ -109,6 +110,13 @@ bool StorageServer::initWebService() {
return status.ok();
}

std::unique_ptr<kvstore::KVEngine> StorageServer::getAdmintSoreInstance() {
int32_t vIdLen = sizeof(JobID) + sizeof(TaskID);
std::unique_ptr<kvstore::KVEngine> re(
new kvstore::RocksEngine(0, vIdLen, dataPaths_[0], walPath_));
return re;
}

bool StorageServer::start() {
ioThreadPool_ = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);
workers_ = apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager(
Expand Down Expand Up @@ -153,12 +161,6 @@ bool StorageServer::start() {
return false;
}

taskMgr_ = AdminTaskManager::instance();
if (!taskMgr_->init()) {
LOG(ERROR) << "Init task manager failed!";
return false;
}

env_ = std::make_unique<storage::StorageEnv>();
env_->kvstore_ = kvstore_.get();
env_->indexMan_ = indexMan_.get();
Expand All @@ -171,6 +173,13 @@ bool StorageServer::start() {

env_->verticesML_ = std::make_unique<VerticesMemLock>();
env_->edgesML_ = std::make_unique<EdgesMemLock>();
env_->adminStore_ = getAdmintSoreInstance();

taskMgr_ = AdminTaskManager::instance(env_.get());
if (!taskMgr_->init()) {
LOG(ERROR) << "Init task manager failed!";
return false;
}

storageThread_.reset(new std::thread([this] {
try {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/StorageServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class StorageServer final {
private:
std::unique_ptr<kvstore::KVStore> getStoreInstance();

std::unique_ptr<kvstore::KVEngine> getAdmintSoreInstance();

bool initWebService();

std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
Expand Down
94 changes: 94 additions & 0 deletions src/storage/admin/AdminTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "storage/admin/AdminTaskManager.h"

#include "storage/admin/AdminTask.h"
#include "storage/admin/AdminTaskProcessor.h"

DEFINE_uint32(max_concurrent_subtasks, 10, "The sub tasks could be invoked simultaneously");

Expand All @@ -25,10 +26,75 @@ bool AdminTaskManager::init() {

bgThread_->addTask(&AdminTaskManager::schedule, this);
shutdown_ = false;
handleUnreportedTasks();
LOG(INFO) << "exit AdminTaskManager::init()";
return true;
}

void AdminTaskManager::handleUnreportedTasks() {
if (env_ == nullptr) return;
std::unique_ptr<kvstore::KVIterator> iter;
auto kvRet = env_->adminStore_->scan(&iter);
if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED || iter == nullptr) return;
std::vector<std::string> keys;
for (; iter->valid(); iter->next()) {
folly::StringPiece key = iter->key();
JobID jobId = *reinterpret_cast<const JobID*>(key.data());
TaskID taskId = *reinterpret_cast<const TaskID*>(key.data() + sizeof(JobID));
keys.emplace_back(key.data(), key.size());
folly::StringPiece val = iter->val();
nebula::meta::cpp2::JobStatus jobStatus =
*reinterpret_cast<const nebula::meta::cpp2::JobStatus*>(val.data() +
sizeof(meta::cpp2::AdminCmd));
nebula::cpp2::ErrorCode errCode;
nebula::meta::cpp2::StatsItem statsItem;
if (jobStatus != nebula::meta::cpp2::JobStatus::FINISHED) {
errCode = nebula::cpp2::ErrorCode::E_TASK_EXECUTION_FAILED;
statsItem.set_status(nebula::meta::cpp2::JobStatus::FAILED);
} else {
errCode = nebula::cpp2::ErrorCode::SUCCEEDED;
statsItem.set_status(nebula::meta::cpp2::JobStatus::FINISHED);
}
meta::cpp2::StatsItem* pStats = &statsItem;

LOG(INFO) << folly::sformat("reportTaskFinish(), job={}, task={}, rc={}",
jobId,
taskId,
apache::thrift::util::enumNameSafe(errCode));
auto maxRetry = 5;
auto retry = 0;
while (retry++ < maxRetry) {
auto rc = nebula::cpp2::ErrorCode::SUCCEEDED;
auto fut = env_->metaClient_->reportTaskFinish(jobId, taskId, errCode, pStats);
fut.wait();
if (!fut.hasValue()) {
LOG(INFO) << folly::sformat(
"reportTaskFinish() got rpc error:, job={}, task={}", jobId, taskId);
continue;
}
if (!fut.value().ok()) {
LOG(INFO) << folly::sformat("reportTaskFinish() has bad status:, job={}, task={}, rc={}",
jobId,
taskId,
fut.value().status().toString());
break;
}
rc = fut.value().value();
LOG(INFO) << folly::sformat("reportTaskFinish(), job={}, task={}, rc={}",
jobId,
taskId,
apache::thrift::util::enumNameSafe(rc));
if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED ||
rc == nebula::cpp2::ErrorCode::E_STORE_FAILURE) {
continue;
} else {
break;
}
}
}
env_->adminStore_->multiRemove(keys);
}

void AdminTaskManager::addAsyncTask(std::shared_ptr<AdminTask> task) {
TaskHandle handle = std::make_pair(task->getJobId(), task->getTaskId());
auto ret = tasks_.insert(handle, task).second;
Expand Down Expand Up @@ -85,6 +151,31 @@ void AdminTaskManager::shutdown() {
LOG(INFO) << "exit AdminTaskManager::shutdown()";
}

void AdminTaskManager::saveTaskStatus(JobID jobId,
TaskID taskId,
meta::cpp2::AdminCmd cmd,
nebula::meta::cpp2::JobStatus jobStatus) {
if (env_ == nullptr) return;
std::string key;
key.reserve(sizeof(JobID) + sizeof(TaskID));
key.append(reinterpret_cast<char*>(&jobId), sizeof(JobID));
key.append(reinterpret_cast<char*>(&taskId), sizeof(TaskID));
std::string val;
val.reserve(sizeof(nebula::meta::cpp2::JobStatus) + sizeof(meta::cpp2::AdminCmd));
val.append(reinterpret_cast<char*>(&cmd), sizeof(meta::cpp2::AdminCmd));
val.append(reinterpret_cast<char*>(&jobStatus), sizeof(nebula::meta::cpp2::JobStatus));
env_->adminStore_->put(key, val);
}

void AdminTaskManager::removeTaskStatus(JobID jobId, TaskID taskId) {
if (env_ == nullptr) return;
std::string key;
key.reserve(sizeof(JobID) + sizeof(TaskID));
key.append(reinterpret_cast<char*>(&jobId), sizeof(JobID));
key.append(reinterpret_cast<char*>(&taskId), sizeof(TaskID));
env_->adminStore_->remove(key);
}

// schedule
void AdminTaskManager::schedule() {
std::chrono::milliseconds interval{20}; // 20ms
Expand Down Expand Up @@ -117,6 +208,7 @@ void AdminTaskManager::schedule() {
task->getJobId(),
apache::thrift::util::enumNameSafe(nebula::error(errOrSubTasks)));
task->finish(nebula::error(errOrSubTasks));
removeTaskStatus(task->getJobId(), task->getTaskId());
tasks_.erase(handle);
continue;
}
Expand All @@ -134,6 +226,7 @@ void AdminTaskManager::schedule() {
if (0 == subTasks.size()) {
FLOG_INFO("task(%d, %d) finished, no subtask", task->getJobId(), task->getTaskId());
task->finish();
removeTaskStatus(task->getJobId(), task->getTaskId());
tasks_.erase(handle);
continue;
}
Expand Down Expand Up @@ -180,6 +273,7 @@ void AdminTaskManager::runSubTask(TaskHandle handle) {
unFinishedSubTask);
if (0 == unFinishedSubTask) {
task->finish();
removeTaskStatus(task->getJobId(), task->getTaskId());
tasks_.erase(handle);
} else {
pool_->add(std::bind(&AdminTaskManager::runSubTask, this, handle));
Expand Down
15 changes: 13 additions & 2 deletions src/storage/admin/AdminTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ class AdminTaskManager {
using TaskQueue = folly::UnboundedBlockingQueue<TaskHandle>;

AdminTaskManager() = default;
static AdminTaskManager* instance() {
static AdminTaskManager sAdminTaskManager;
explicit AdminTaskManager(storage::StorageEnv* env = nullptr) : env_(env) {}
static AdminTaskManager* instance(storage::StorageEnv* env = nullptr) {
static AdminTaskManager sAdminTaskManager(env);
return &sAdminTaskManager;
}

Expand All @@ -51,6 +52,15 @@ class AdminTaskManager {

bool isFinished(JobID jobID, TaskID taskID);

void saveTaskStatus(JobID jobId,
TaskID taskId,
meta::cpp2::AdminCmd cmd,
nebula::meta::cpp2::JobStatus jobStatus);

void removeTaskStatus(JobID jobId, TaskID taskId);

void handleUnreportedTasks();

private:
void schedule();
void runSubTask(TaskHandle handle);
Expand All @@ -61,6 +71,7 @@ class AdminTaskManager {
TaskContainer tasks_;
TaskQueue taskQueue_;
std::unique_ptr<thread::GenericWorker> bgThread_;
storage::StorageEnv* env_{nullptr};
};

} // namespace storage
Expand Down
3 changes: 2 additions & 1 deletion src/storage/admin/AdminTaskProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

namespace nebula {
namespace storage {

void AdminTaskProcessor::process(const cpp2::AddAdminTaskRequest& req) {
auto taskManager = AdminTaskManager::instance();

Expand Down Expand Up @@ -64,6 +63,8 @@ void AdminTaskProcessor::process(const cpp2::AddAdminTaskRequest& req) {
TaskContext ctx(req, std::move(cb));
auto task = AdminTaskFactory::createAdminTask(env_, std::move(ctx));
if (task) {
taskManager->saveTaskStatus(
ctx.jobId_, ctx.taskId_, ctx.cmd_, nebula::meta::cpp2::JobStatus::RUNNING);
taskManager->addAsyncTask(task);
} else {
cpp2::PartitionResult thriftRet;
Expand Down

0 comments on commit f62eb9d

Please sign in to comment.