Skip to content

Commit

Permalink
Implement limit (#750)
Browse files Browse the repository at this point in the history
* Implement limit

* Rebase upstream

* Adrress wadeliuyi and whitewum's comments

* Rebase upstream

* Add vid and timestamp

* rebase upstream

* Adrress Simon Liu's comments

* rebase upstream

* Adrress Simon Liu's comments
  • Loading branch information
laura-ding authored and dangleptr committed Oct 22, 2019
1 parent e3ab67f commit 27663d6
Show file tree
Hide file tree
Showing 15 changed files with 433 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/graph/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ nebula_add_library(
MatchExecutor.cpp
DeleteVertexExecutor.cpp
FindPathExecutor.cpp
LimitExecutor.cpp
)

nebula_add_library(
Expand Down
4 changes: 4 additions & 0 deletions src/graph/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "graph/UpdateVertexExecutor.h"
#include "graph/UpdateEdgeExecutor.h"
#include "graph/FindPathExecutor.h"
#include "graph/LimitExecutor.h"

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -159,6 +160,9 @@ std::unique_ptr<Executor> Executor::makeExecutor(Sentence *sentence) {
case Sentence::Kind::kFindPath:
executor = std::make_unique<FindPathExecutor>(sentence, ectx());
break;
case Sentence::Kind::kLimit:
executor = std::make_unique<LimitExecutor>(sentence, ectx());
break;
case Sentence::Kind::kUnknown:
LOG(FATAL) << "Sentence kind unknown";
break;
Expand Down
142 changes: 142 additions & 0 deletions src/graph/LimitExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "base/Base.h"
#include "graph/LimitExecutor.h"

namespace nebula {
namespace graph {

LimitExecutor::LimitExecutor(Sentence *sentence, ExecutionContext *ectx) : TraverseExecutor(ectx) {
sentence_ = static_cast<LimitSentence*>(sentence);
}


Status LimitExecutor::prepare() {
offset_ = sentence_->offset();
if (offset_ < 0) {
return Status::SyntaxError("skip `%ld' is illegal", offset_);
}
count_ = sentence_->count();
if (count_ < 0) {
return Status::SyntaxError("count `%ld' is illegal", count_);
}

return Status::OK();
}


void LimitExecutor::execute() {
FLOG_INFO("Executing Limit: %s", sentence_->toString().c_str());
if (inputs_ == nullptr || count_ == 0) {
DCHECK(onFinish_);
onFinish_();
return;
}

auto ret = inputs_->getRows();
if (!ret.ok()) {
DCHECK(onFinish_);
onFinish_();
return;
}
auto inRows = std::move(ret).value();
if (inRows.size() > static_cast<uint64_t>(offset_ + count_)) {
rows_.resize(count_);
rows_.assign(std::make_move_iterator(inRows.begin()) + offset_,
std::make_move_iterator(inRows.begin()) + offset_ + count_);
} else if (inRows.size() > static_cast<uint64_t>(offset_) &&
inRows.size() <= static_cast<uint64_t>(offset_ + count_)) {
rows_.resize(inRows.size() - offset_);
rows_.assign(std::make_move_iterator(inRows.begin()) + offset_,
std::make_move_iterator(inRows.end()));
}

if (onResult_) {
auto output = setupInterimResult();
onResult_(std::move(output));
}

DCHECK(onFinish_);
onFinish_();
}


void LimitExecutor::feedResult(std::unique_ptr<InterimResult> result) {
if (result == nullptr) {
return;
}
inputs_ = std::move(result);
}


std::unique_ptr<InterimResult> LimitExecutor::setupInterimResult() {
if (rows_.empty()) {
return nullptr;
}

auto rsWriter = std::make_unique<RowSetWriter>(inputs_->schema());
using Type = cpp2::ColumnValue::Type;
for (auto &row : rows_) {
RowWriter writer(inputs_->schema());
auto columns = row.get_columns();
for (auto &column : columns) {
switch (column.getType()) {
case cpp2::ColumnValue::Type::id:
writer << column.get_id();
break;
case Type::integer:
writer << column.get_integer();
break;
case Type::double_precision:
writer << column.get_double_precision();
break;
case Type::bool_val:
writer << column.get_bool_val();
break;
case Type::str:
writer << column.get_str();
break;
case cpp2::ColumnValue::Type::timestamp:
writer << column.get_timestamp();
break;
default:
LOG(FATAL) << "Not Support: " << column.getType();
}
}
rsWriter->addRow(writer);
}

auto result = std::make_unique<InterimResult>(getResultColumnNames());
if (rsWriter != nullptr) {
result->setInterim(std::move(rsWriter));
}
return result;
}


std::vector<std::string> LimitExecutor::getResultColumnNames() const {
std::vector<std::string> columnNames;
columnNames.reserve(inputs_->schema()->getNumFields());
auto field = inputs_->schema()->begin();
while (field) {
columnNames.emplace_back(field->getName());
++field;
}
return columnNames;
}


void LimitExecutor::setupResponse(cpp2::ExecutionResponse &resp) {
resp.set_column_names(getResultColumnNames());
if (rows_.empty()) {
return;
}

resp.set_rows(std::move(rows_));
}
} // namespace graph
} // namespace nebula
46 changes: 46 additions & 0 deletions src/graph/LimitExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef GRAPH_LIMITEXECUTOR_H_
#define GRAPH_LIMITEXECUTOR_H_

#include "base/Base.h"
#include "graph/TraverseExecutor.h"

namespace nebula {
namespace graph {

class LimitExecutor final : public TraverseExecutor {
public:
LimitExecutor(Sentence *sentence, ExecutionContext *ectx);

const char* name() const override {
return "LimitExecutor";
}

Status MUST_USE_RESULT prepare() override;

void execute() override;

void feedResult(std::unique_ptr<InterimResult> result) override;

void setupResponse(cpp2::ExecutionResponse &resp) override;

private:
std::unique_ptr<InterimResult> setupInterimResult();
std::vector<std::string> getResultColumnNames() const;

private:
LimitSentence *sentence_{nullptr};
std::unique_ptr<InterimResult> inputs_;
std::vector<cpp2::RowValue> rows_;
int64_t offset_{-1};
int64_t count_{-1};
};
} // namespace graph
} // namespace nebula

#endif // GRAPH_LIMITEXECUTOR_H
4 changes: 4 additions & 0 deletions src/graph/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "graph/FindExecutor.h"
#include "graph/MatchExecutor.h"
#include "graph/FindPathExecutor.h"
#include "graph/LimitExecutor.h"

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -60,6 +61,9 @@ TraverseExecutor::makeTraverseExecutor(Sentence *sentence, ExecutionContext *ect
case Sentence::Kind::kFindPath:
executor = std::make_unique<FindPathExecutor>(sentence, ectx);
break;
case Sentence::Kind::kLimit:
executor = std::make_unique<LimitExecutor>(sentence, ectx);
break;
case Sentence::Kind::kUnknown:
LOG(FATAL) << "Sentence kind unknown";
break;
Expand Down
17 changes: 17 additions & 0 deletions src/graph/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,20 @@ nebula_add_test(
wangle
gtest
)

nebula_add_test(
NAME
group_by_limit_test
SOURCES
GroupByLimitTest.cpp
OBJECTS
$<TARGET_OBJECTS:graph_test_common_obj>
$<TARGET_OBJECTS:client_cpp_obj>
$<TARGET_OBJECTS:adHocSchema_obj>
${GRAPH_TEST_LIBS}
LIBRARIES
${THRIFT_LIBRARIES}
${ROCKSDB_LIBRARIES}
wangle
gtest
)
2 changes: 1 addition & 1 deletion src/graph/test/DataTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ AssertionResult DataTest::prepareSchema() {
<< " failed, error code "<< static_cast<int32_t>(code);
}
}
// Test same propName diff tyep in diff tags
// Test same propName diff type in diff tags
{
cpp2::ExecutionResponse resp;
std::string cmd = "CREATE TAG employee(name int)";
Expand Down
Loading

0 comments on commit 27663d6

Please sign in to comment.