Skip to content

Commit

Permalink
Merge branch 'master' into fix_unwind_crash
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 authored Dec 9, 2021
2 parents 0205821 + 67bb6ff commit 7cb24f9
Show file tree
Hide file tree
Showing 58 changed files with 1,808 additions and 159 deletions.
10 changes: 5 additions & 5 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,15 +558,15 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> StorageClient::lookupAndTravers
});
}

StorageRpcRespFuture<cpp2::ScanEdgeResponse> StorageClient::scanEdge(
StorageRpcRespFuture<cpp2::ScanResponse> StorageClient::scanEdge(
const CommonRequestParam& param,
const cpp2::EdgeProp& edgeProp,
const std::vector<cpp2::EdgeProp>& edgeProp,
int64_t limit,
const Expression* filter) {
std::unordered_map<HostAddr, cpp2::ScanEdgeRequest> requests;
auto status = getHostPartsWithCursor(param.space);
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ScanEdgeResponse>>(
return folly::makeFuture<StorageRpcResponse<cpp2::ScanResponse>>(
std::runtime_error(status.status().toString()));
}
auto& clusters = status.value();
Expand All @@ -589,15 +589,15 @@ StorageRpcRespFuture<cpp2::ScanEdgeResponse> StorageClient::scanEdge(
const cpp2::ScanEdgeRequest& r) { return client->future_scanEdge(r); });
}

StorageRpcRespFuture<cpp2::ScanVertexResponse> StorageClient::scanVertex(
StorageRpcRespFuture<cpp2::ScanResponse> StorageClient::scanVertex(
const CommonRequestParam& param,
const std::vector<cpp2::VertexProp>& vertexProp,
int64_t limit,
const Expression* filter) {
std::unordered_map<HostAddr, cpp2::ScanVertexRequest> requests;
auto status = getHostPartsWithCursor(param.space);
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ScanVertexResponse>>(
return folly::makeFuture<StorageRpcResponse<cpp2::ScanResponse>>(
std::runtime_error(status.status().toString()));
}
auto& clusters = status.value();
Expand Down
10 changes: 5 additions & 5 deletions src/clients/storage/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ class StorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsyncCli
StorageRpcRespFuture<cpp2::GetNeighborsResponse> lookupAndTraverse(
const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec);

StorageRpcRespFuture<cpp2::ScanEdgeResponse> scanEdge(const CommonRequestParam& param,
const cpp2::EdgeProp& vertexProp,
int64_t limit,
const Expression* filter);
StorageRpcRespFuture<cpp2::ScanResponse> scanEdge(const CommonRequestParam& param,
const std::vector<cpp2::EdgeProp>& vertexProp,
int64_t limit,
const Expression* filter);

StorageRpcRespFuture<cpp2::ScanVertexResponse> scanVertex(
StorageRpcRespFuture<cpp2::ScanResponse> scanVertex(
const CommonRequestParam& param,
const std::vector<cpp2::VertexProp>& vertexProp,
int64_t limit,
Expand Down
18 changes: 9 additions & 9 deletions src/common/graph/ExecutionResponseOps-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct TccStructTraits<::nebula::ExecutionResponse> {
_ftype = apache::thrift::protocol::T_I32;
} else if (_fname == "latency_in_us") {
fid = 2;
_ftype = apache::thrift::protocol::T_I32;
_ftype = apache::thrift::protocol::T_I64;
} else if (_fname == "data") {
fid = 3;
_ftype = apache::thrift::protocol::T_STRUCT;
Expand Down Expand Up @@ -75,9 +75,9 @@ uint32_t Cpp2Ops<::nebula::ExecutionResponse>::write(Protocol* proto,
::nebula::ErrorCode>::write(*proto,
obj->errorCode);
xfer += proto->writeFieldEnd();
xfer += proto->writeFieldBegin("latency_in_us", apache::thrift::protocol::T_I32, 2);
xfer += proto->writeFieldBegin("latency_in_us", apache::thrift::protocol::T_I64, 2);
xfer += ::apache::thrift::detail::pm::protocol_methods<::apache::thrift::type_class::integral,
int32_t>::write(*proto, obj->latencyInUs);
int64_t>::write(*proto, obj->latencyInUs);
xfer += proto->writeFieldEnd();
if (obj->data != nullptr) {
xfer += proto->writeFieldBegin("data", apache::thrift::protocol::T_STRUCT, 3);
Expand Down Expand Up @@ -134,7 +134,7 @@ _readField_error_code : {
}
_readField_latency_in_us : {
::apache::thrift::detail::pm::protocol_methods<::apache::thrift::type_class::integral,
int32_t>::read(*proto, obj->latencyInUs);
int64_t>::read(*proto, obj->latencyInUs);
isset_latency_in_us = true;
}

Expand Down Expand Up @@ -216,7 +216,7 @@ _readField_comment : {
}
}
case 2: {
if (LIKELY(_readState.fieldType == apache::thrift::protocol::T_I32)) {
if (LIKELY(_readState.fieldType == apache::thrift::protocol::T_I64)) {
goto _readField_latency_in_us;
} else {
goto _skip;
Expand Down Expand Up @@ -276,9 +276,9 @@ uint32_t Cpp2Ops<::nebula::ExecutionResponse>::serializedSize(
xfer += ::apache::thrift::detail::pm::protocol_methods<
::apache::thrift::type_class::enumeration,
::nebula::ErrorCode>::serializedSize<false>(*proto, obj->errorCode);
xfer += proto->serializedFieldSize("latency_in_us", apache::thrift::protocol::T_I32, 2);
xfer += proto->serializedFieldSize("latency_in_us", apache::thrift::protocol::T_I64, 2);
xfer += ::apache::thrift::detail::pm::
protocol_methods<::apache::thrift::type_class::integral, int32_t>::serializedSize<false>(
protocol_methods<::apache::thrift::type_class::integral, int64_t>::serializedSize<false>(
*proto, obj->latencyInUs);
if (obj->data != nullptr) {
xfer += proto->serializedFieldSize("data", apache::thrift::protocol::T_STRUCT, 3);
Expand Down Expand Up @@ -314,9 +314,9 @@ uint32_t Cpp2Ops<::nebula::ExecutionResponse>::serializedSizeZC(
xfer += ::apache::thrift::detail::pm::protocol_methods<
::apache::thrift::type_class::enumeration,
::nebula::ErrorCode>::serializedSize<false>(*proto, obj->errorCode);
xfer += proto->serializedFieldSize("latency_in_us", apache::thrift::protocol::T_I32, 2);
xfer += proto->serializedFieldSize("latency_in_us", apache::thrift::protocol::T_I64, 2);
xfer += ::apache::thrift::detail::pm::
protocol_methods<::apache::thrift::type_class::integral, int32_t>::serializedSize<false>(
protocol_methods<::apache::thrift::type_class::integral, int64_t>::serializedSize<false>(
*proto, obj->latencyInUs);
if (obj->data != nullptr) {
xfer += proto->serializedFieldSize("data", apache::thrift::protocol::T_STRUCT, 3);
Expand Down
2 changes: 1 addition & 1 deletion src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ struct ExecutionResponse {
}

ErrorCode errorCode{ErrorCode::SUCCEEDED};
int32_t latencyInUs{0};
int64_t latencyInUs{0};
std::unique_ptr<nebula::DataSet> data{nullptr};
std::unique_ptr<std::string> spaceName{nullptr};
std::unique_ptr<std::string> errorMsg{nullptr};
Expand Down
12 changes: 12 additions & 0 deletions src/common/meta/SchemaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,17 @@ StatusOr<std::pair<bool, int32_t>> SchemaManager::getSchemaIDByName(GraphSpaceID
return Status::Error("Schema not exist: %s", schemaName.str().c_str());
}

StatusOr<std::unordered_map<TagID, std::string>> SchemaManager::getAllTags(GraphSpaceID space) {
std::unordered_map<TagID, std::string> tags;
auto tagSchemas = getAllLatestVerTagSchema(space);
NG_RETURN_IF_ERROR(tagSchemas);
for (auto& tagSchema : tagSchemas.value()) {
auto tagName = toTagName(space, tagSchema.first);
NG_RETURN_IF_ERROR(tagName);
tags.emplace(tagSchema.first, tagName.value());
}
return tags;
}

} // namespace meta
} // namespace nebula
2 changes: 2 additions & 0 deletions src/common/meta/SchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class SchemaManager {

virtual StatusOr<std::vector<std::string>> getAllEdge(GraphSpaceID space) = 0;

StatusOr<std::unordered_map<TagID, std::string>> getAllTags(GraphSpaceID space);

// get all version of all tag schema
virtual StatusOr<TagSchemas> getAllVerTagSchema(GraphSpaceID space) = 0;

Expand Down
2 changes: 2 additions & 0 deletions src/graph/context/ast/CypherAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ struct ScanInfo {
std::vector<IndexID> indexIds;
// use for seek by edge only
MatchEdge::Direction direction{MatchEdge::Direction::OUT_EDGE};
// use for scan seek
bool anyLabel{false};
};

struct CypherClauseContextBase : AstContext {
Expand Down
2 changes: 2 additions & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ nebula_add_library(
query/InnerJoinExecutor.cpp
query/IndexScanExecutor.cpp
query/AssignExecutor.cpp
query/ScanVerticesExecutor.cpp
query/ScanEdgesExecutor.cpp
query/TraverseExecutor.cpp
query/AppendVerticesExecutor.cpp
algo/ConjunctPathExecutor.cpp
Expand Down
8 changes: 8 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
#include "graph/executor/query/MinusExecutor.h"
#include "graph/executor/query/ProjectExecutor.h"
#include "graph/executor/query/SampleExecutor.h"
#include "graph/executor/query/ScanEdgesExecutor.h"
#include "graph/executor/query/ScanVerticesExecutor.h"
#include "graph/executor/query/SortExecutor.h"
#include "graph/executor/query/TopNExecutor.h"
#include "graph/executor/query/TraverseExecutor.h"
Expand Down Expand Up @@ -170,6 +172,12 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kGetVertices: {
return pool->add(new GetVerticesExecutor(node, qctx));
}
case PlanNode::Kind::kScanEdges: {
return pool->add(new ScanEdgesExecutor(node, qctx));
}
case PlanNode::Kind::kScanVertices: {
return pool->add(new ScanVerticesExecutor(node, qctx));
}
case PlanNode::Kind::kGetNeighbors: {
return pool->add(new GetNeighborsExecutor(node, qctx));
}
Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/query/GetPropExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class GetPropExecutor : public StorageAccessExecutor {
GetPropExecutor(const std::string &name, const PlanNode *node, QueryContext *qctx)
: StorageAccessExecutor(name, node, qctx) {}

Status handleResp(storage::StorageRpcResponse<storage::cpp2::GetPropResponse> &&rpcResp,
template <typename Response>
Status handleResp(storage::StorageRpcResponse<Response> &&rpcResp,
const std::vector<std::string> &colNames) {
auto result = handleCompleteness(rpcResp, FLAGS_accept_partial_success);
NG_RETURN_IF_ERROR(result);
Expand Down
50 changes: 50 additions & 0 deletions src/graph/executor/query/ScanEdgesExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "graph/executor/query/ScanEdgesExecutor.h"

#include "common/time/ScopedTimer.h"
#include "graph/context/QueryContext.h"
#include "graph/planner/plan/Query.h"
#include "graph/util/SchemaUtil.h"

using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::ScanResponse;

namespace nebula {
namespace graph {

folly::Future<Status> ScanEdgesExecutor::execute() { return scanEdges(); }

folly::Future<Status> ScanEdgesExecutor::scanEdges() {
SCOPED_TIMER(&execTime_);
StorageClient *client = qctx()->getStorageClient();
auto *se = asNode<ScanEdges>(node());
if (se->limit() < 0) {
return Status::Error("Scan edges must specify limit number.");
}

time::Duration scanEdgesTime;
StorageClient::CommonRequestParam param(se->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
qctx()->plan()->isProfileEnabled());
return DCHECK_NOTNULL(client)
->scanEdge(param, *DCHECK_NOTNULL(se->props()), se->limit(), se->filter())
.via(runner())
.ensure([this, scanEdgesTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc", folly::sformat("{}(us)", scanEdgesTime.elapsedInUSec()));
})
.thenValue([this, se](StorageRpcResponse<ScanResponse> &&rpcResp) {
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
return handleResp(std::move(rpcResp), se->colNames());
});
}

} // namespace graph
} // namespace nebula
22 changes: 22 additions & 0 deletions src/graph/executor/query/ScanEdgesExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "graph/executor/query/GetPropExecutor.h"

namespace nebula {
namespace graph {
class ScanEdgesExecutor final : public GetPropExecutor {
public:
ScanEdgesExecutor(const PlanNode *node, QueryContext *qctx)
: GetPropExecutor("ScanEdgesExecutor", node, qctx) {}

folly::Future<Status> execute() override;

private:
folly::Future<Status> scanEdges();
};

} // namespace graph
} // namespace nebula
50 changes: 50 additions & 0 deletions src/graph/executor/query/ScanVerticesExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "graph/executor/query/ScanVerticesExecutor.h"

#include "common/time/ScopedTimer.h"
#include "graph/context/QueryContext.h"
#include "graph/util/SchemaUtil.h"

using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::ScanResponse;

namespace nebula {
namespace graph {

folly::Future<Status> ScanVerticesExecutor::execute() { return scanVertices(); }

folly::Future<Status> ScanVerticesExecutor::scanVertices() {
SCOPED_TIMER(&execTime_);

auto *sv = asNode<ScanVertices>(node());
if (sv->limit() < 0) {
return Status::Error("Scan vertices must specify limit number.");
}
StorageClient *storageClient = qctx()->getStorageClient();

time::Duration scanVertexTime;
StorageClient::CommonRequestParam param(sv->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
qctx()->plan()->isProfileEnabled());
return DCHECK_NOTNULL(storageClient)
->scanVertex(param, *DCHECK_NOTNULL(sv->props()), sv->limit(), sv->filter())
.via(runner())
.ensure([this, scanVertexTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc", folly::sformat("{}(us)", scanVertexTime.elapsedInUSec()));
})
.thenValue([this, sv](StorageRpcResponse<ScanResponse> &&rpcResp) {
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
return handleResp(std::move(rpcResp), sv->colNames());
});
}

} // namespace graph
} // namespace nebula
26 changes: 26 additions & 0 deletions src/graph/executor/query/ScanVerticesExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#pragma once

#include "graph/executor/query/GetPropExecutor.h"
#include "graph/planner/plan/Query.h"

namespace nebula {
namespace graph {

class ScanVerticesExecutor final : public GetPropExecutor {
public:
ScanVerticesExecutor(const PlanNode *node, QueryContext *qctx)
: GetPropExecutor("ScanVerticesExecutor", node, qctx) {}

folly::Future<Status> execute() override;

private:
folly::Future<Status> scanVertices();
};

} // namespace graph
} // namespace nebula
5 changes: 5 additions & 0 deletions src/graph/optimizer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ nebula_add_library(
rule/PushFilterDownAggregateRule.cpp
rule/PushFilterDownProjectRule.cpp
rule/PushFilterDownLeftJoinRule.cpp
rule/PushFilterDownScanVerticesRule.cpp
rule/PushVFilterDownScanVerticesRule.cpp
rule/OptimizeEdgeIndexScanByFilterRule.cpp
rule/OptimizeTagIndexScanByFilterRule.cpp
rule/UnionAllIndexScanBaseRule.cpp
Expand All @@ -45,6 +47,9 @@ nebula_add_library(
rule/PushLimitDownEdgeIndexPrefixScanRule.cpp
rule/PushLimitDownEdgeIndexRangeScanRule.cpp
rule/PushLimitDownProjectRule.cpp
rule/PushLimitDownScanAppendVerticesRule.cpp
rule/GetEdgesTransformRule.cpp
rule/PushLimitDownScanEdgesAppendVerticesRule.cpp
)

nebula_add_subdirectory(test)
Loading

0 comments on commit 7cb24f9

Please sign in to comment.