Skip to content

Commit

Permalink
clean some code
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyu85cn committed Mar 28, 2022
1 parent 9ab0b7e commit 329cbd0
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 49 deletions.
1 change: 1 addition & 0 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const
auto newLeader = decodeHost(OP_TRANS_LEADER, log);
LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader;
preProcessTransLeader(newLeader);
LOG(INFO) << idStr_ << "~preprocess trans leader " << newLeader;
break;
}
case OP_ADD_PEER: {
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -850,8 +850,8 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) {
}
stats::StatsManager::addValue(kAppendWalLatencyUs, execTime_);
lastId = wal_->lastLogId();
VLOG(4) << idStr_ << "Succeeded writing logs [first, last] [" << iter.firstLogId() << ", "
<< lastId << "] to WAL";
VLOG(4) << idStr_ << "Succeeded writing logs [" << iter.firstLogId() << ", " << lastId
<< "] to WAL";
} while (false);

if (!checkAppendLogResult(res)) {
Expand Down
1 change: 0 additions & 1 deletion src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,6 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
std::string,
kvstore::MergeableAtomicOp,
folly::Promise<nebula::cpp2::ErrorCode>>;
// using LogCache = std::vector<LogCacheItem>;
using LogCache = std::deque<LogCacheItem>;

/****************************************************
Expand Down
26 changes: 0 additions & 26 deletions src/kvstore/test/NebulaStoreTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,6 @@ TEST(NebulaStoreTest, ReadSnapshotTest) {
batchHolder->put(key.data(), val.data());
expected.emplace_back(std::move(key), std::move(val));
}
// return encodeBatchValue(batchHolder->getBatch());
ret.code = nebula::cpp2::ErrorCode::SUCCEEDED;
ret.batch = encodeBatchValue(batchHolder->getBatch());
return ret;
Expand Down Expand Up @@ -951,16 +950,6 @@ TEST(NebulaStoreTest, AtomicOpBatchTest) {
// put kv
{
std::vector<std::pair<std::string, std::string>> expected, result;
// auto atomic = [&]() -> std::string {
// auto batchHolder = std::make_unique<kvstore::BatchHolder>();
// for (auto i = 0; i < 20; i++) {
// auto key = folly::stringPrintf("key_%d", i);
// auto val = folly::stringPrintf("val_%d", i);
// batchHolder->put(key.data(), val.data());
// expected.emplace_back(std::move(key), std::move(val));
// }
// return encodeBatchValue(batchHolder->getBatch());
// };

auto atomic = [&] {
kvstore::MergeableAtomicOpResult ret;
Expand Down Expand Up @@ -997,21 +986,6 @@ TEST(NebulaStoreTest, AtomicOpBatchTest) {
// put and remove
{
std::vector<std::pair<std::string, std::string>> expected, result;
// auto atomic = [&]() -> std::string {
// auto batchHolder = std::make_unique<kvstore::BatchHolder>();
// for (auto i = 0; i < 20; i++) {
// auto key = folly::stringPrintf("key_%d", i);
// auto val = folly::stringPrintf("val_%d", i);
// batchHolder->put(key.data(), val.data());
// if (i % 5 != 0) {
// expected.emplace_back(std::move(key), std::move(val));
// }
// }
// for (auto i = 0; i < 20; i = i + 5) {
// batchHolder->remove(folly::stringPrintf("key_%d", i));
// }
// return encodeBatchValue(batchHolder->getBatch());
// };

auto atomic = [&] {
kvstore::MergeableAtomicOpResult ret;
Expand Down
1 change: 0 additions & 1 deletion src/storage/admin/RebuildTagIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space
}
}

DLOG(INFO) << "====> items.size() = " << items.size() << ", data.size() = " << data.size();
for (const auto& item : items) {
if (item->get_schema_id().get_tag_id() == tagID) {
auto valuesRet = IndexKeyUtils::collectIndexValues(reader.get(), item.get(), schema);
Expand Down
1 change: 0 additions & 1 deletion src/storage/admin/StatsTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId,
}
tagsVertices[tagId] += 1;
tagIter->next();
spaceVertices++;
}

// Only stats valid edge data, no multi version
Expand Down
5 changes: 4 additions & 1 deletion src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
std::vector<kvstore::KV> kvs;
kvs.reserve(edges.size());
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;
deleteDupEdge(const_cast<std::vector<cpp2::NewEdge>&>(edges));
for (const auto& edge : edges) {
auto edgeKey = *edge.key_ref();
VLOG(3) << "PartitionID: " << partId << ", VertexID: " << *edgeKey.src_ref()
Expand All @@ -172,7 +173,8 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
spaceVidLen_, edgeKey.src_ref()->getStr(), edgeKey.dst_ref()->getStr())) {
LOG(ERROR) << "Space " << spaceId_ << " vertex length invalid, "
<< "space vid len: " << spaceVidLen_ << ", edge srcVid: " << *edgeKey.src_ref()
<< ", dstVid: " << *edgeKey.dst_ref();
<< ", dstVid: " << *edgeKey.dst_ref() << ", ifNotExists_: " << std::boolalpha
<< ifNotExists_;
code = nebula::cpp2::ErrorCode::E_INVALID_VID;
break;
}
Expand All @@ -191,6 +193,7 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
*edgeKey.ranking_ref(),
(*edgeKey.dst_ref()).getStr());
if (ifNotExists_ && !visited.emplace(key).second) {
LOG(INFO) << "skip " << edgeKey.src_ref()->getStr();
continue;
}

Expand Down
18 changes: 10 additions & 8 deletions src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) {
}

void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) {
LOG(INFO) << "AddVerticesProcessor::doProcess";
const auto& partVertices = req.get_parts();
const auto& propNamesMap = req.get_prop_names();
for (auto& part : partVertices) {
Expand Down Expand Up @@ -138,6 +137,8 @@ void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) {
void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& req) {
const auto& partVertices = req.get_parts();
const auto& propNamesMap = req.get_prop_names();
auto batchHolder = std::make_unique<kvstore::BatchHolder>();

for (const auto& part : partVertices) {
auto partId = part.first;
const auto& vertices = part.second;
Expand All @@ -157,7 +158,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
break;
}

// batchHolder->put(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid), "");
batchHolder->put(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid), "");
for (const auto& newTag : newTags) {
auto tagId = newTag.get_tag_id();
VLOG(3) << "PartitionID: " << partId << ", VertexID: " << vid << ", TagID: " << tagId;
Expand Down Expand Up @@ -192,8 +193,8 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleAsync(spaceId_, partId, code);
} else {
auto atomicOp = [partId, data = std::move(kvs), this]() mutable {
return addVerticesWithIndex(partId, std::move(data));
auto atomicOp = [&, partId, data = std::move(kvs)]() mutable {
return addVerticesWithIndex(partId, std::move(data), std::move(batchHolder));
};

auto cb = [partId, this](nebula::cpp2::ErrorCode ec) { handleAsync(spaceId_, partId, ec); };
Expand All @@ -203,11 +204,12 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
}

kvstore::MergeableAtomicOpResult AddVerticesProcessor::addVerticesWithIndex(
PartitionID partId, std::vector<kvstore::KV>&& data) {
PartitionID partId,
std::vector<kvstore::KV>&& data,
std::unique_ptr<kvstore::BatchHolder>&& batchHolder) {
kvstore::MergeableAtomicOpResult ret;
ret.code = nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED;
IndexCountWrapper wrapper(env_);
auto batchHolder = std::make_unique<kvstore::BatchHolder>();
for (auto& [key, value] : data) {
auto vId = NebulaKeyUtils::getVertexId(spaceVidLen_, key);
auto tagId = NebulaKeyUtils::getTagId(spaceVidLen_, key);
Expand Down Expand Up @@ -242,7 +244,7 @@ kvstore::MergeableAtomicOpResult AddVerticesProcessor::addVerticesWithIndex(
if (tagId == index->get_schema_id().get_tag_id()) {
// step 1, Delete old version index if exists.
if (oldReader != nullptr) {
auto oldIndexKeys = indexKeys(partId, vId.str(), oldReader.get(), index, nullptr);
auto oldIndexKeys = indexKeys(partId, vId.str(), oldReader.get(), index, schema.get());
if (!oldIndexKeys.empty()) {
// Check the index is building for the specified partition or
// not.
Expand All @@ -268,7 +270,7 @@ kvstore::MergeableAtomicOpResult AddVerticesProcessor::addVerticesWithIndex(

// step 2, Insert new vertex index
if (newReader != nullptr) {
auto newIndexKeys = indexKeys(partId, vId.str(), newReader.get(), index, nullptr);
auto newIndexKeys = indexKeys(partId, vId.str(), newReader.get(), index, schema.get());
if (!newIndexKeys.empty()) {
// check if index has ttl field, write it to index value if exists
auto field = CommonUtils::ttlValue(schema.get(), newReader.get());
Expand Down
6 changes: 4 additions & 2 deletions src/storage/mutate/AddVerticesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ class AddVerticesProcessor : public BaseProcessor<cpp2::ExecResponse> {

void deleteDupVid(std::vector<cpp2::NewVertex>& vertices);

kvstore::MergeableAtomicOpResult addVerticesWithIndex(PartitionID partId,
std::vector<kvstore::KV>&& data);
kvstore::MergeableAtomicOpResult addVerticesWithIndex(
PartitionID partId,
std::vector<kvstore::KV>&& data,
std::unique_ptr<kvstore::BatchHolder>&& batchHolder);

private:
GraphSpaceID spaceId_;
Expand Down
10 changes: 5 additions & 5 deletions src/storage/test/RebuildIndexTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ TEST_F(RebuildIndexTest, RebuildTagIndexCheckALLData) {

// Wait for the task finished
do {
// usleep(50);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
} while (!manager_->isFinished(context.jobId_, context.taskId_));

Expand Down Expand Up @@ -340,7 +339,6 @@ TEST_F(RebuildIndexTest, RebuildTagIndexWithAppend) {

// Wait for the task finished
do {
// usleep(50);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
} while (!manager_->isFinished(context.jobId_, context.taskId_));

Expand All @@ -353,7 +351,11 @@ TEST_F(RebuildIndexTest, RebuildTagIndexWithAppend) {

RebuildIndexTest::env_->rebuildIndexGuard_->clear();
writer->stop();
sleep(1);

for (int i = 1; i <= 5; ++i) {
LOG(INFO) << "sleep for " << i << "s";
sleep(1);
}
}

TEST_F(RebuildIndexTest, RebuildTagIndex) {
Expand Down Expand Up @@ -510,7 +512,6 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexWithAppend) {

// Wait for the task finished
do {
// usleep(50);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
} while (!manager_->isFinished(context.jobId_, context.taskId_));

Expand Down Expand Up @@ -559,7 +560,6 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndex) {

// Wait for the task finished
do {
// usleep(50);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
} while (!manager_->isFinished(context.jobId_, context.taskId_));

Expand Down
3 changes: 1 addition & 2 deletions src/storage/test/StatsTaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ TEST_F(StatsTaskTest, StatsTagAndEdgeData) {
ASSERT_EQ(0, edge.second);
}

// ASSERT_EQ(81, *statsItem.space_vertices_ref());
EXPECT_EQ(81, *statsItem.space_vertices_ref());
ASSERT_EQ(81, *statsItem.space_vertices_ref());
ASSERT_EQ(0, *statsItem.space_edges_ref());
}

Expand Down

0 comments on commit 329cbd0

Please sign in to comment.