Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug mq #1

Open
wants to merge 5 commits into
base: cachelib
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions cmake/FindCacheLib.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# - Try to find CacheLib includes dirs and libraries
#
# Usage of this module as follows:
#
# find_package(CacheLib)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# Variables defined by this module:
#
# CacheLib_FOUND System has CacheLib, include and lib dirs found
# CacheLib_INCLUDE_DIR The CacheLib includes directories.
# CacheLib_LIBRARIES The CacheLib libraries.

find_path(CacheLib_INCLUDE_DIR NAMES cachelib)
find_library(CacheLib_ALLOCATOR NAMES libcachelib_allocator.a)
find_library(CacheLib_DATATYPE NAMES libcachelib_datatype.a)
find_library(CacheLib_SHM NAMES libcachelib_shm.a)
find_library(CacheLib_COMMON NAMES libcachelib_common.a)
find_library(CacheLib_NAVY NAMES libcachelib_navy.a)


if(CacheLib_INCLUDE_DIR
AND CacheLib_ALLOCATOR
AND CacheLib_DATATYPE
AND CacheLib_SHM
AND CacheLib_COMMON
AND CacheLib_NAVY)
set(CacheLib_FOUND TRUE)
set(CacheLib_LIBRARIES ${CacheLib_DATATYPE} ${CacheLib_COMMON} ${CacheLib_SHM} ${CacheLib_ALLOCATOR} ${CacheLib_NAVY})
mark_as_advanced(
CacheLib_INCLUDE_DIR
CacheLib_LIBRARIES
)
endif()

if(NOT CacheLib_FOUND)
message(FATAL_ERROR "CacheLib doesn't exist")
endif()
1 change: 1 addition & 0 deletions cmake/nebula/NebulaCMakeMacros.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ macro(nebula_link_libraries target)
${target}
${ARGN}
folly
iberty
fmt
glog
gflags
Expand Down
2 changes: 2 additions & 0 deletions cmake/nebula/ThirdPartyConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ find_package(Sodium REQUIRED)
if (${CMAKE_HOST_SYSTEM_PROCESSOR} MATCHES "x86_64")
find_package(Breakpad REQUIRED)
endif()
find_package(CacheLib REQUIRED)

set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -L ${NEBULA_THIRDPARTY_ROOT}/lib")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -L ${NEBULA_THIRDPARTY_ROOT}/lib64")
Expand Down Expand Up @@ -138,6 +139,7 @@ set(PROXYGEN_LIBRARIES
)

set(ROCKSDB_LIBRARIES ${Rocksdb_LIBRARY})
set(CACHELIB_LIBRARIES ${CacheLib_LIBRARIES})

# All compression libraries
set(COMPRESSION_LIBRARIES bz2 snappy zstd z lz4)
Expand Down
18 changes: 18 additions & 0 deletions conf/nebula-graphd.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,21 @@
--enable_experimental_feature=false

--enable_space_level_metrics=false

############## graph cache ##############
# Whether to enable graph cache
--enable_graph_cache=false
# Total capacity reservered for graph in memory cache in MB
--graph_cache_capacity=0
# Number of buckets in base 2 logarithm. E.g., in case of 10, the total number of buckets will be 2^10.
--graph_cache_buckets_power=10
# Number of locks in base 2 logarithm. E.g., in case of 5, the total number of buckets will be 2^5.
--graph_cache_locks_power=5
# Edge pool size in MB
--edge_pool_capacity=50
# Vertex pool size in MB
--vertex_pool_capacity=50
# Vertex item ttl in seconds
--vertex_item_ttl=300
# Edge item ttl in seconds
--edge_item_ttl=300
17 changes: 17 additions & 0 deletions conf/nebula-graphd.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,20 @@
--enable_experimental_feature=false

--enable_space_level_metrics=false
############## graph cache ##############
# Whether to enable graph cache
--enable_graph_cache=false
# Total capacity reservered for graph in memory cache in MB
--graph_cache_capacity=0
# Number of buckets in base 2 logarithm. E.g., in case of 10, the total number of buckets will be 2^10.
--graph_cache_buckets_power=10
# Number of locks in base 2 logarithm. E.g., in case of 5, the total number of buckets will be 2^5.
--graph_cache_locks_power=5
# Edge pool size in MB
--edge_pool_capacity=50
# Vertex pool size in MB
--vertex_pool_capacity=50
# Vertex item ttl in seconds
--vertex_item_ttl=300
# Edge item ttl in seconds
--edge_item_ttl=300
12 changes: 12 additions & 0 deletions conf/nebula-storaged.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,15 @@
--rocksdb_column_family_options={"write_buffer_size":"67108864","max_write_buffer_number":"4","max_bytes_for_level_base":"268435456"}
# rocksdb BlockBasedTableOptions in json, each name and value of option is string, given as "option_name":"option_value" separated by comma
--rocksdb_block_based_table_options={"block_size":"8192"}

############## storage cache ##############
# Whether to enable storage cache
--enable_storage_cache=false
# Total capacity reservered for storage in memory cache in MB
--storage_cache_capacity=0
# Number of buckets in base 2 logarithm. E.g., in case of 10, the total number of buckets will be 2^10.
--storage_cache_buckets_power=10
# Number of locks in base 2 logarithm. E.g., in case of 5, the total number of buckets will be 2^5.
--storage_cache_locks_power=5
# Vertex pool size in MB
--vertex_pool_capacity=50
12 changes: 12 additions & 0 deletions conf/nebula-storaged.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@
# Whether to garbage collect blobs during compaction
--rocksdb_enable_blob_garbage_collection=true

############## storage cache ##############
# Whether to enable storage cache
--enable_storage_cache=false
# Total capacity reservered for storage in memory cache in MB
--storage_cache_capacity=0
# Number of buckets in base 2 logarithm. E.g., in case of 10, the total number of buckets will be 2^10.
--storage_cache_buckets_power=10
# Number of locks in base 2 logarithm. E.g., in case of 5, the total number of buckets will be 2^5.
--storage_cache_locks_power=5
# Vertex pool size in MB
--vertex_pool_capacity=50

############### misc ####################
--snapshot_part_rate_limit=10485760
--snapshot_batch_size=1048576
Expand Down
6 changes: 5 additions & 1 deletion src/clients/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,9 @@ nebula_add_library(
InternalStorageClient.cpp
)

nebula_add_subdirectory(stats)
nebula_add_library(
storage_client_cache_obj OBJECT
StorageClientCache.cpp
)

nebula_add_subdirectory(stats)
13 changes: 13 additions & 0 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <folly/Try.h>

#include "clients/storage/StorageClientCache.h"
#include "clients/storage/stats/StorageClientStats.h"
#include "common/ssl/SSLConfig.h"
#include "common/stats/StatsManager.h"
Expand Down Expand Up @@ -75,6 +76,7 @@ StorageClientBase<ClientType>::StorageClientBase(
std::shared_ptr<folly::IOThreadPoolExecutor> threadPool, meta::MetaClient* metaClient)
: metaClient_(metaClient), ioThreadPool_(threadPool) {
clientsMan_ = std::make_unique<thrift::ThriftClientManager<ClientType>>(FLAGS_enable_ssl);
clientCache_ = std::make_unique<nebula::graph::StorageClientCache>();
}

template <typename ClientType>
Expand Down Expand Up @@ -124,11 +126,21 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClientBase<ClientType>::c
DCHECK(!!ioThreadPool_);

for (auto& req : requests) {
if constexpr (std::is_same_v<Request, GetNeighborsRequest>) {
if constexpr (std::is_same_v<Response, GetNeighborsResponse>) {
auto cacheResp = clientCache_->getCacheValue(req.second);
if (cacheResp.ok()) {
context->resp.addResponse(std::move(cacheResp.value()));
continue;
}
}
}
auto& host = req.first;
auto spaceId = req.second.get_space_id();
auto res = context->insertRequest(host, std::move(req.second));
DCHECK(res.second);
evb = ioThreadPool_->getEventBase();

// Invoke the remote method
folly::via(evb, [this, evb, context, host, spaceId, res]() mutable {
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
Expand Down Expand Up @@ -170,6 +182,7 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClientBase<ClientType>::c
auto latency = result.get_latency_in_us();
context->resp.setLatency(host, latency, time::WallClock::fastNowInMicroSec() - start);

// (TODO) insert response into graph cache
// Keep the response
context->resp.addResponse(std::move(resp));
})
Expand Down
2 changes: 2 additions & 0 deletions src/clients/storage/StorageClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <folly/futures/Future.h>

#include "clients/meta/MetaClient.h"
#include "clients/storage/StorageClientCache.h"
#include "common/base/Base.h"
#include "common/base/StatusOr.h"
#include "common/meta/Common.h"
Expand Down Expand Up @@ -221,6 +222,7 @@ class StorageClientBase {
private:
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
std::unique_ptr<thrift::ThriftClientManager<ClientType>> clientsMan_;
std::unique_ptr<nebula::graph::StorageClientCache> clientCache_;
};

} // namespace storage
Expand Down
169 changes: 169 additions & 0 deletions src/clients/storage/StorageClientCache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "clients/storage/StorageClientCache.h"

namespace nebula {
namespace graph {

StorageClientCache::StorageClientCache() {
cache_ = GraphCache::instance();
// init colNames
resultDataSet_.colNames.emplace_back(nebula::kVid);
resultDataSet_.colNames.emplace_back("_stats");
}

StatusOr<GetNeighborsResponse> StorageClientCache::getCacheValue(const GetNeighborsRequest& req) {
NG_RETURN_IF_ERROR(checkCondition(req));

NG_RETURN_IF_ERROR(buildEdgeContext(req.get_traverse_spec()));

for (const auto& partEntry : req.get_parts()) {
// auto partId = partEntry.first;
for (const auto& entry : partEntry.second) {
CHECK_GE(entry.values.size(), 1);
auto vId = entry.values[0].getStr();
std::vector<Value> row;
/*
* Format is
* |_vid|_stats|_edge:like:_dst|_edge:-like:_dst|_edge:serve:_dst|_expr|
* |"a" | NULL |[["b"],["c"]] |[["d"],["e"]] |[["f"], ["g"]] | NULL|
*
* first column is vertexId, second column is reserved for stats
*/
row.emplace_back(vId);
row.emplace_back(Value());

for (const auto edgeType : edgeTypes_) {
const auto& eKey = edgeKey(vId, edgeType);
auto status = cache_->getEdges(eKey);
// if onekey cache miss return
NG_RETURN_IF_ERROR(status);
auto dstRes = status.value();

nebula::List dstList;
dstList.values.reserve(dstRes.size());
for (const auto& dst : dstRes) {
nebula::List propList;
propList.values.emplace_back(std::move(dst));
dstList.values.emplace_back(std::move(propList));
}
nebula::List cell;
cell.values.emplace_back(std::move(dstList));
row.emplace_back(std::move(cell));
}
// col : expr
row.emplace_back(Value());
resultDataSet_.rows.emplace_back(std::move(row));
}
}
GetNeighborsResponse resp;
resp.vertices_ref() = std::move(resultDataSet_);
return resp;
}

void StorageClientCache::insertResultIntoCache(GetNeighborsResponse& resp) {
auto dataset = resp.get_vertices();
if (dataset == nullptr) {
LOG(INFO) << "GraphCache Empty dataset in response";
return;
}
// list.values.emplace_back(std::move(*dataset));
// ResultBuilder builder;
// builder.state(Result::State::kSuccess);
// builder.value(Value(std::move(list))).iter(Iterator::Kind::kGetNeighbors);

auto colSize = dataset->colNames.size();
if (edgeTypes_.size() + 3 != colSize) {
LOG(INFO) << "RPC Dataset colsize error";
return;
}

auto& rows = (*dataset).rows;
for (const auto& row : rows) {
// first column is vertexId
auto vId = row.values[0].getStr();
for (size_t i = 2; i < row.values.size() - 1; ++i) {
auto edgeType = edgeTypes_[i - 2];
auto& cell = row.values[i];
// DCHECK(nebula::List, cell);
auto& cellList = cell.getList();
std::vector<std::string> edgeDsts;
edgeDsts.reserve(cellList.size());
for (const auto& dst : cellList.values) {
edgeDsts.emplace_back(dst.getStr());
}
if (!cache_->addAllEdges(edgeKey(vId, edgeType), edgeDsts)) {
LOG(INFO) << "Cache vid : " << vId << " edgeType: " << edgeType << " fail.";
}
}
}
LOG(INFO) << "Insert Result Into Cache Success";
}

std::string StorageClientCache::tagKey(const std::string& vId, TagID tagId) {
std::string key;
key.reserve(vId.size() + sizeof(TagID));
key.append(vId.data(), vId.size()).append(reinterpret_cast<const char*>(&tagId), sizeof(TagID));
return key;
}

std::string StorageClientCache::edgeKey(const std::string& srcVid, EdgeType type) {
std::string key;
key.reserve(srcVid.size() + sizeof(EdgeType));
key.append(srcVid.data(), srcVid.size())
.append(reinterpret_cast<const char*>(&type), sizeof(EdgeType));
return key;
}

Status StorageClientCache::checkCondition(const GetNeighborsRequest& req) {
if (req.get_traverse_spec().filter_ref().has_value()) {
return Status::Error("GetNeighbor contain filter expression");
}
if ((*req.traverse_spec_ref()).random_ref().has_value()) {
return Status::Error("GetNeighbor contain random");
}
if ((*req.traverse_spec_ref()).random_ref().has_value()) {
return Status::Error("GetNeighbor contain limit");
}
if (!req.get_traverse_spec().vertex_props_ref().has_value()) {
return Status::Error("GetNeighbor contain vertexProps");
}
return Status::OK();
}

Status StorageClientCache::buildEdgeContext(const TraverseSpec& req) {
if (!req.edge_props_ref().has_value()) {
return Status::Error("GetNeighbor does not contain edgeProps");
}
auto edgeProps = *req.edge_props_ref();
for (const auto& edgeProp : edgeProps) {
auto edgeType = edgeProp.get_type();
edgeTypes_.push_back(edgeType);
// TODO getEdgeName throught edgeSchema
std::string edgeName = "test";
std::string colName = "_edge:";
colName.append(edgeType > 0 ? "+" : "-").append(edgeName);
if ((*edgeProp.props_ref()).empty()) {
// should contain dst
return Status::Error("%s 's prop is nullptr", edgeName.c_str());
}
for (const auto& name : (*edgeProp.props_ref())) {
if (name != nebula::kDst) {
// must contain dst
return Status::Error("Edge : %s contain %s", edgeName.c_str(), name.c_str());
}
}
// todo fix it
colName += ":" + std::string(nebula::kDst);
resultDataSet_.colNames.emplace_back(std::move(colName));
}
resultDataSet_.colNames.emplace_back("_expr");

return Status::OK();
}

} // namespace graph
} // namespace nebula
Loading