Skip to content

Commit

Permalink
fix SimpleKVVerifyTool and StorageIntegrityTool
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 committed Nov 8, 2021
1 parent 6993dc4 commit 543e420
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 148 deletions.
2 changes: 1 addition & 1 deletion src/tools/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
nebula_add_subdirectory(storage-perf)
#nebula_add_subdirectory(simple-kv-verify)
nebula_add_subdirectory(simple-kv-verify)
nebula_add_subdirectory(meta-dump)
nebula_add_subdirectory(db-dump)
nebula_add_subdirectory(db-upgrade)
18 changes: 8 additions & 10 deletions src/tools/simple-kv-verify/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_transaction_executor>
$<TARGET_OBJECTS:storage_common_obj>
$<TARGET_OBJECTS:kvstore_obj>
$<TARGET_OBJECTS:raftex_obj>
Expand All @@ -19,11 +20,12 @@ nebula_add_executable(
$<TARGET_OBJECTS:keyutils_obj>
$<TARGET_OBJECTS:meta_keyutils_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:storage_client_base_obj>
$<TARGET_OBJECTS:graph_storage_client_obj>
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:meta_client_obj>
$<TARGET_OBJECTS:file_based_cluster_id_man_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:raftex_thrift_obj>
Expand All @@ -43,20 +45,16 @@ nebula_add_executable(
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:encryption_obj>
$<TARGET_OBJECTS:ft_es_storage_adapter_obj>
$<TARGET_OBJECTS:version_obj>
$<TARGET_OBJECTS:ssl_obj>
$<TARGET_OBJECTS:geo_index_obj>
LIBRARIES
${ROCKSDB_LIBRARIES}
${THRIFT_LIBRARIES}
${PROXYGEN_LIBRARIES}
wangle
gtest
)

#install(
# TARGETS
# simple_kv_verify
# DESTINATION
# bin
# COMPONENT
# tool
#)
6 changes: 4 additions & 2 deletions src/tools/simple-kv-verify/SimpleKVVerifyTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

#include <folly/executors/IOThreadPoolExecutor.h>
#include <thrift/lib/cpp/util/EnumUtils.h>

#include "clients/meta/MetaClient.h"
#include "clients/storage/GraphStorageClient.h"
Expand Down Expand Up @@ -107,8 +108,9 @@ class SimpleKVVerifyTool {
auto key = pair.first;
bool found = false;
for (const auto& result : resp.responses()) {
auto iter = result.key_values.find(key);
if (iter != result.key_values.end()) {
auto kvs = result.get_key_values();
auto iter = kvs.find(key);
if (iter != kvs.end()) {
if (iter->second != pairs[key]) {
LOG(ERROR) << "Check Fail: key = " << key << ", values: " << iter->second
<< " != " << pairs[key];
Expand Down
191 changes: 56 additions & 135 deletions src/tools/storage-perf/StorageIntegrityTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,33 @@
DEFINE_string(meta_server_addrs, "", "meta server address");
DEFINE_int32(io_threads, 10, "client io threads");

DEFINE_int32(partition_num, 1024, "partititon for space");
DEFINE_string(space_name, "test_space", "the space name");
DEFINE_string(tag_name, "test_tag", "the tag name");
DEFINE_string(prop_name, "test_prop", "the property name");

DEFINE_string(first_vertex_id, "1", "The smallest vertex id");
DEFINE_uint64(width, 100, "width of matrix");
DEFINE_uint64(height, 1000, "height of matrix");

DECLARE_int32(heartbeat_interval_secs);
DEFINE_string(first_key, "1", "the smallest key");
DEFINE_uint32(width, 100, "width of matrix");
DEFINE_uint32(height, 1000, "height of matrix");

namespace nebula {
namespace storage {

/**
* We generate a big circle of data, all node is the vertex, and the vertex have
* only one property of the next vertex, so we can validate them by traversing.
* We generate a big circle of data, all node are key/values, the value is the next node's key
* , so we can validate them by traversing.
*
* There are some gflags we need to pay attention:
* 1. The space's replica must be 1, because we don't have retry in
* StorageClient, we will update it after we suppport preheat. The tag must have
* only one int property, which is prop_name.
* 2. If the space and tag doesn't exists, it will try to create one, maybe you
* need to set heartbeat_interval_secs to make sure the storage service has load
* meta.
* 3. The width and height is the size of the big linked list, you can refer to
* the graph below. As expected, we can traverse the big linked list after width
* * height steps starting from any node in the list.
* The width and height is the size of the big linked list, you can refer to the graph below. As
* expected, we can traverse the big linked list after width * height steps starting from any node
* in the list.
*/
class IntegrityTest {
public:
IntegrityTest()
: propName_(FLAGS_prop_name),
width_{FLAGS_width},
height_{FLAGS_height},
firstVertexId_{FLAGS_first_vertex_id} {}
IntegrityTest() : width_{FLAGS_width}, height_{FLAGS_height}, firstKey_{FLAGS_first_key} {}

int run() {
if (!init()) {
return EXIT_FAILURE;
}
prepareData();
if (!validate(firstVertexId_, width_ * height_)) {
if (!validate(firstKey_, width_ * height_)) {
LOG(INFO) << "Integrity test failed";
return EXIT_FAILURE;
}
Expand All @@ -65,7 +49,12 @@ class IntegrityTest {

private:
bool init() {
FLAGS_heartbeat_interval_secs = 10;
if (static_cast<int64_t>(width_) * static_cast<int64_t>(height_) >
std::numeric_limits<int32_t>::max()) {
LOG(ERROR) << "Width * Height is out of range";
return false;
}

auto metaAddrsRet = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs);
if (!metaAddrsRet.ok() || metaAddrsRet.value().empty()) {
LOG(ERROR) << "Can't get metaServer address, status: " << metaAddrsRet.status()
Expand All @@ -84,40 +73,12 @@ class IntegrityTest {

auto spaceResult = mClient_->getSpaceIdByNameFromCache(FLAGS_space_name);
if (!spaceResult.ok()) {
LOG(ERROR) << "Get spaceId failed, try to create one";
meta::cpp2::SpaceDesc spaceDesc;
spaceDesc.set_space_name(FLAGS_space_name);
spaceDesc.set_partition_num(FLAGS_partition_num);
spaceDesc.set_replica_factor(1);
auto ret = mClient_->createSpace(spaceDesc).get();
if (!ret.ok()) {
LOG(ERROR) << "Create space failed: " << ret.status();
return false;
}
spaceId_ = ret.value();
LOG(ERROR) << "Get spaceId failed";
return false;
} else {
spaceId_ = spaceResult.value();
}

auto tagResult = mClient_->getTagIDByNameFromCache(spaceId_, FLAGS_tag_name);
if (!tagResult.ok()) {
sleep(FLAGS_heartbeat_interval_secs + 1);
LOG(ERROR) << "Get tagId failed, try to create one: " << tagResult.status();
nebula::meta::cpp2::Schema schema;
nebula::meta::cpp2::ColumnDef column;
column.name = FLAGS_prop_name;
column.type.set_type(nebula::cpp2::PropertyType::INT64);
(*schema.columns_ref()).emplace_back(std::move(column));
auto ret = mClient_->createTagSchema(spaceId_, FLAGS_tag_name, schema).get();
if (!ret.ok()) {
LOG(ERROR) << "Create tag failed: " << ret.status();
return false;
}
tagId_ = ret.value();
} else {
tagId_ = tagResult.value();
}

client_ = std::make_unique<GraphStorageClient>(threadPool_, mClient_.get());
return true;
}
Expand Down Expand Up @@ -145,32 +106,31 @@ class IntegrityTest {
* |___________________________|
*/
void prepareData() {
std::vector<VertexID> first;
std::vector<VertexID> prev;
std::vector<VertexID> cur;
std::vector<std::string> first;
std::vector<std::string> prev;
std::vector<std::string> cur;

LOG(INFO) << "Start insert vertex";
LOG(INFO) << "Start insert kvs";
for (size_t i = 0; i < width_; i++) {
prev.emplace_back(std::to_string(std::atol(firstVertexId_.c_str()) + i));
prev.emplace_back(std::to_string(std::atoi(firstKey_.c_str()) + i));
}
// leave alone the first line, generate other lines
for (size_t i = 1; i < height_; i++) {
addVertex(prev, cur, std::to_string(std::atol(firstVertexId_.c_str() + i * width_)));
insertRow(prev, cur, std::to_string(std::atoi(firstKey_.c_str()) + i * width_));
prev = std::move(cur);
}
// shift the last line
std::rotate(prev.begin(), prev.end() - 1, prev.end());
// generate first line, each node in first line will points to a node in
// rotated last line, which will make the matrix a big linked list
addVertex(prev, first, firstVertexId_);
insertRow(prev, first, firstKey_);
LOG(INFO) << "Prepare data ok";
}

void addVertex(std::vector<VertexID>& prev, std::vector<VertexID>& cur, VertexID startId) {
std::unordered_map<TagID, std::vector<std::string>> propNames;
propNames[tagId_].emplace_back(propName_);
GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0);
auto future = client_->addVertices(param, genVertices(prev, cur, startId), propNames, true);
void insertRow(const std::vector<std::string>& prev,
std::vector<std::string>& cur,
const std::string& startId) {
auto future = client_->put(spaceId_, genKeyValue(prev, cur, startId));
auto resp = std::move(future).get();
if (!resp.succeeded()) {
for (auto& err : resp.failedParts()) {
Expand All @@ -180,82 +140,45 @@ class IntegrityTest {
}
}

std::vector<storage::cpp2::NewVertex> genVertices(std::vector<VertexID>& prev,
std::vector<VertexID>& cur,
VertexID startId) {
// We insert add vertices of a row once a time
std::vector<storage::cpp2::NewVertex> newVertices;
std::vector<KeyValue> genKeyValue(const std::vector<std::string>& prev,
std::vector<std::string>& cur,
const std::string& startId) {
// We insert key-values of a row once a time
std::vector<KeyValue> kvs;
for (size_t i = 0; i < width_; i++) {
VertexID vId;
vId = std::to_string(std::atol(startId.c_str()) + i);
cur.emplace_back(vId);

storage::cpp2::NewVertex v;
v.set_id(vId);
std::vector<nebula::storage::cpp2::NewTag> tags;
auto key = std::to_string(std::atoi(startId.c_str()) + i);
cur.emplace_back(key);
kvs.emplace_back(std::make_pair(cur[i], prev[i]));

storage::cpp2::NewTag tag;
tag.set_tag_id(tagId_);

std::vector<nebula::Value> props;
Value val(prev[i]);
props.emplace_back(val);
tag.set_props(props);
tags.emplace_back(std::move(tag));

v.set_tags(std::move(tags));
newVertices.emplace_back(std::move(v));
VLOG(2) << "Build " << cur[i] << " -> " << prev[i];
PLOG_EVERY_N(INFO, 10000) << "We have inserted "
<< std::atol(vId.c_str()) - std::atol(firstVertexId_.c_str()) -
width_
<< " vertices so far, total: " << width_ * height_;
LOG_EVERY_N(INFO, 10000) << "We have inserted "
<< std::atoi(key.c_str()) - std::atoi(firstKey_.c_str()) - width_
<< " key-value so far, total: " << width_ * height_;
}
return newVertices;
return kvs;
}

bool validate(VertexID startId, int64_t queryTimes) {
bool validate(const std::string& startId, int64_t queryTimes) {
int64_t count = 0;
VertexID nextId = startId;
std::string nextId = startId;
while (count < queryTimes) {
PLOG_EVERY_N(INFO, 1000) << "We have gone " << count << " steps so far";
// TODO support getProps
std::vector<cpp2::VertexProp> props;
cpp2::VertexProp tagProp;
tagProp.set_tag(tagId_);
(*tagProp.props_ref()).emplace_back(propName_);
DataSet dataset({kVid});
GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0);
auto future = client_->getProps(param, dataset, &props, nullptr, nullptr);
LOG_EVERY_N(INFO, 1000) << "We have gone " << count << " steps so far";
auto future = client_->get(spaceId_, {nextId});
auto resp = std::move(future).get();
if (!resp.succeeded()) {
LOG(ERROR) << "Failed to fetch props of vertex " << nextId;
LOG(ERROR) << "Failed to get value of " << nextId;
return false;
}
// TODO
#if 0
auto& results = resp.responses();
// get tag schema
auto* vschema = results[0].get_vertex_schema();
DCHECK(vschema != nullptr);
auto tagIter = vschema->find(tagId_);
DCHECK(tagIter != vschema->end());
auto tagProvider = std::make_shared<ResultSchemaProvider>(tagIter->second);

for (auto& vdata : results[0].vertices) {
auto iter = std::find_if(vdata.tag_data.begin(), vdata.tag_data.end(),
[this] (const auto& tagData) {
return tagData.tag_id == tagId_;
});
if (iter == vdata.tag_data.end()) {
return false;
}
auto tagReader = RowReaderWrapper::getRowReader(iter->data, tagProvider);
auto ret = RowReader::getPropByName(tagReader.get(), propName_);
CHECK(ok(ret));
nextId = boost::get<int64_t>(value(ret));
}
#endif
const auto& results = resp.responses();
DCHECK_EQ(results.size(), 1UL);
auto kvs = results[0].get_key_values();
auto iter = kvs.find(nextId);
if (iter == kvs.end()) {
LOG(ERROR) << "Value of " << nextId << " not found";
return false;
}
nextId = iter->second;
count++;
}
// after go to next node for width * height times, it should go back to
Expand All @@ -271,11 +194,9 @@ class IntegrityTest {
std::unique_ptr<meta::MetaClient> mClient_;
std::shared_ptr<folly::IOThreadPoolExecutor> threadPool_;
GraphSpaceID spaceId_;
TagID tagId_;
std::string propName_;
size_t width_;
size_t height_;
VertexID firstVertexId_;
std::string firstKey_;
};

} // namespace storage
Expand Down

0 comments on commit 543e420

Please sign in to comment.